From 08a21001b5f27088a412fbe7a6fba8d558eade40 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Sat, 13 Aug 2022 17:10:36 +0200 Subject: [PATCH] Use redis-client as transport `main` and `distributed` test suites pass. `cluster` and `sentinel` still need some updating --- .rubocop.yml | 3 + CHANGELOG.md | 3 + Gemfile | 1 + benchmarking/logging.rb | 74 -- lib/redis.rb | 89 +-- lib/redis/client.rb | 682 ++---------------- lib/redis/cluster.rb | 30 +- lib/redis/commands/connection.rb | 7 +- lib/redis/commands/hyper_log_log.rb | 2 +- lib/redis/commands/server.rb | 10 +- lib/redis/commands/transactions.rb | 32 +- lib/redis/connection.rb | 11 - lib/redis/connection/command_helper.rb | 41 -- lib/redis/connection/hiredis.rb | 68 -- lib/redis/connection/registry.rb | 13 - lib/redis/connection/ruby.rb | 435 ----------- lib/redis/distributed.rb | 8 +- lib/redis/errors.rb | 6 + lib/redis/pipeline.rb | 213 ++---- lib/redis/subscribe.rb | 14 +- redis.gemspec | 2 + .../commands_requiring_clustering_test.rb | 2 +- test/distributed/distributed_test.rb | 6 +- test/distributed/internals_test.rb | 6 +- test/distributed/transactions_test.rb | 34 - test/helper.rb | 10 +- test/lint/authentication.rb | 2 +- test/lint/hashes.rb | 4 +- test/lint/streams.rb | 20 +- test/lint/strings.rb | 2 +- test/redis/blocking_commands_test.rb | 1 + test/redis/client_test.rb | 18 +- test/redis/command_map_test.rb | 29 - test/redis/connection_handling_test.rb | 27 +- test/redis/connection_test.rb | 31 +- test/redis/error_replies_test.rb | 11 - test/redis/fork_safety_test.rb | 8 +- test/redis/internals_test.rb | 99 +-- test/redis/pipelining_commands_test.rb | 29 +- test/redis/publish_subscribe_test.rb | 20 +- test/redis/ssl_test.rb | 8 +- test/redis/transactions_test.rb | 30 +- test/redis/url_param_test.rb | 58 +- test/support/redis_mock.rb | 2 +- 44 files changed, 278 insertions(+), 1923 deletions(-) delete mode 100644 benchmarking/logging.rb delete mode 100644 lib/redis/connection.rb delete mode 100644 lib/redis/connection/command_helper.rb delete mode 100644 lib/redis/connection/hiredis.rb delete mode 100644 lib/redis/connection/registry.rb delete mode 100644 lib/redis/connection/ruby.rb delete mode 100644 test/redis/command_map_test.rb diff --git a/.rubocop.yml b/.rubocop.yml index 81f7e6c91..93cb2964f 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -139,6 +139,9 @@ Metrics/BlockNesting: Style/HashTransformValues: Enabled: false +Style/TrailingCommaInHashLiteral: + Enabled: false + Style/SymbolProc: Exclude: - 'test/**/*' diff --git a/CHANGELOG.md b/CHANGELOG.md index 39800b776..2dbb5e23e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ # Unreleased 5.0.0 +- `select` no longer record the current database. If the client has to reconnect after `select` was used, it will reconnect to the original database. +- Removed `logger` option. +- Removed `reconnect_delay_max` and `reconnect_delay`, you can pass precise sleep durations to `reconnect_attempts` instead. - Require Ruby 2.5+. - Removed the deprecated `queue` and `commit` methods. Use `pipelined` instead. - Removed the deprecated `pipelined` and `multi` signature. Commands now MUST be called on the block argument, not the original redis instance. diff --git a/Gemfile b/Gemfile index ef1ffde60..27b4a549d 100644 --- a/Gemfile +++ b/Gemfile @@ -9,3 +9,4 @@ gem 'rake' gem 'rubocop', '~> 1.25.1' gem 'mocha' gem 'hiredis' +gem 'redis-client', github: 'redis-rb/redis-client' # RESP2 supportnot yet released diff --git a/benchmarking/logging.rb b/benchmarking/logging.rb deleted file mode 100644 index 7b09a4609..000000000 --- a/benchmarking/logging.rb +++ /dev/null @@ -1,74 +0,0 @@ -# frozen_string_literal: true - -# Run with -# -# $ ruby -Ilib benchmarking/logging.rb -# - -begin - require "bench" -rescue LoadError - warn "`gem install bench` and try again." - exit 1 -end - -require "redis" -require "logger" - -def log(level, namespace = nil) - logger = (namespace || Kernel).const_get(:Logger).new("/dev/null") - logger.level = (namespace || Logger).const_get(level) - logger -end - -def stress(redis) - redis.flushdb - - n = (ARGV.shift || 2000).to_i - - n.times do |i| - key = "foo:#{i}" - redis.set key, i - redis.get key - end -end - -default = Redis.new - -logging_redises = [ - Redis.new(logger: log(:DEBUG)), - Redis.new(logger: log(:INFO)) -] - -begin - require "log4r" - - logging_redises += [ - Redis.new(logger: log(:DEBUG, Log4r)), - Redis.new(logger: log(:INFO, Log4r)) - ] -rescue LoadError - warn "Log4r not installed. `gem install log4r` if you want to compare it against Ruby's " \ - "Logger (spoiler: it's much faster)." -end - -benchmark "Default options (no logger)" do - stress(default) -end - -logging_redises.each do |redis| - logger = redis._client.logger - - case logger - when Logger - level = Logger::SEV_LABEL[logger.level] - when Log4r::Logger - level = logger.levels[logger.level] - end - - benchmark "#{logger.class} on #{level}" do - stress(redis) - end -end - -run 10 diff --git a/lib/redis.rb b/lib/redis.rb index 29d3a913e..b114b1950 100644 --- a/lib/redis.rb +++ b/lib/redis.rb @@ -24,6 +24,8 @@ def deprecate!(message) include Commands + SERVER_URL_OPTIONS = %i(url host port path).freeze + # Create a new client instance # # @param [Hash] options @@ -37,13 +39,12 @@ def deprecate!(message) # @option options [Float] :connect_timeout (same as timeout) timeout for initial connect in seconds # @option options [String] :username Username to authenticate against server # @option options [String] :password Password to authenticate against server - # @option options [Integer] :db (0) Database to select after initial connect + # @option options [Integer] :db (0) Database to select after connect and on reconnects # @option options [Symbol] :driver Driver to use, currently supported: `:ruby`, `:hiredis` # @option options [String] :id ID for the client connection, assigns name to current connection by sending # `CLIENT SETNAME` - # @option options [Hash, Integer] :tcp_keepalive Keepalive values, if Integer `intvl` and `probe` are calculated - # based on the value, if Hash `time`, `intvl` and `probes` can be specified as a Integer - # @option options [Integer] :reconnect_attempts Number of attempts trying to connect + # @option options [Integer, Array] :reconnect_attempts Number of attempts trying to connect, + # or a list of sleep duration between attempts. # @option options [Boolean] :inherit_socket (false) Whether to use socket in forked process or not # @option options [Array] :sentinels List of sentinels to contact # @option options [Symbol] :role (:master) Role to fetch via Sentinel, either `:master` or `:slave` @@ -56,23 +57,22 @@ def deprecate!(message) # @return [Redis] a new client instance def initialize(options = {}) @options = options.dup + @options[:reconnect_attempts] = 1 unless @options.key?(:reconnect_attempts) + if ENV["REDIS_URL"] && SERVER_URL_OPTIONS.none? { |o| @options.key?(o) } + @options[:url] = ENV["REDIS_URL"] + end + inherit_socket = @options.delete(:inherit_socket) @cluster_mode = options.key?(:cluster) client = @cluster_mode ? Cluster : Client - @original_client = @client = client.new(options) + @original_client = @client = client.new(@options) + @client.inherit_socket! if inherit_socket @queue = Hash.new { |h, k| h[k] = [] } @monitor = Monitor.new end - # Run code with the client reconnecting - def with_reconnect(val = true, &blk) - synchronize do |client| - client.with_reconnect(val, &blk) - end - end - # Run code without the client reconnecting - def without_reconnect(&blk) - with_reconnect(false, &blk) + def without_reconnect(&block) + @original_client.disable_reconnection(&block) end # Test whether or not the client is connected @@ -82,7 +82,7 @@ def connected? # Disconnect the client as quickly and silently as possible. def close - @original_client.disconnect + @original_client.close end alias disconnect! close @@ -95,59 +95,15 @@ def _client end def pipelined - pipeline = Pipeline.new(@client) - pipelined_connection = PipelinedConnection.new(pipeline) - yield pipelined_connection synchronize do |client| - client.call_pipeline(pipeline) - end - end - - # Mark the start of a transaction block. - # - # Passing a block is optional. - # - # @example With a block - # redis.multi do |multi| - # multi.set("key", "value") - # multi.incr("counter") - # end # => ["OK", 6] - # - # @example Without a block - # redis.multi - # # => "OK" - # redis.set("key", "value") - # # => "QUEUED" - # redis.incr("counter") - # # => "QUEUED" - # redis.exec - # # => ["OK", 6] - # - # @yield [multi] the commands that are called inside this block are cached - # and written to the server upon returning from it - # @yieldparam [Redis] multi `self` - # - # @return [String, Array<...>] - # - when a block is not given, `OK` - # - when a block is given, an array with replies - # - # @see #watch - # @see #unwatch - def multi - if block_given? - pipeline = Pipeline::Multi.new(@client) - pipelined_connection = PipelinedConnection.new(pipeline) - yield pipelined_connection - synchronize do |client| - client.call_pipeline(pipeline) + client.pipelined do |raw_pipeline| + yield PipelinedConnection.new(raw_pipeline) end - else - send_command([:multi]) end end def id - @original_client.id + @original_client.config.id || @original_client.config.server_url end def inspect @@ -165,8 +121,8 @@ def connection host: @original_client.host, port: @original_client.port, db: @original_client.db, - id: @original_client.id, - location: @original_client.location + id: id, + location: "#{@original_client.host}:#{@original_client.port}" } end @@ -184,7 +140,7 @@ def send_command(command, &block) def send_blocking_command(command, timeout, &block) @monitor.synchronize do - @client.call_with_timeout(command, timeout, &block) + @client.blocking_call(timeout, command, &block) end end @@ -192,7 +148,7 @@ def _subscription(method, timeout, channels, block) return @client.call([method] + channels) if subscribed? begin - original, @client = @client, SubscribedClient.new(@client) + original, @client = @client, SubscribedClient.new(@client.pubsub) if timeout > 0 @client.send(method, timeout, *channels, &block) else @@ -205,7 +161,6 @@ def _subscription(method, timeout, channels, block) end require "redis/version" -require "redis/connection" require "redis/client" require "redis/cluster" require "redis/pipeline" diff --git a/lib/redis/client.rb b/lib/redis/client.rb index a617b48a7..e043df43a 100644 --- a/lib/redis/client.rb +++ b/lib/redis/client.rb @@ -1,665 +1,109 @@ # frozen_string_literal: true -require "socket" -require "cgi" -require "redis/errors" +require 'redis-client' class Redis - class Client - # Defaults are also used for converting string keys to symbols. - DEFAULTS = { - url: -> { ENV["REDIS_URL"] }, - scheme: "redis", - host: "127.0.0.1", - port: 6379, - path: nil, - read_timeout: nil, - write_timeout: nil, - connect_timeout: nil, - timeout: 5.0, - username: nil, - password: nil, - db: 0, - driver: nil, - id: nil, - tcp_keepalive: 0, - reconnect_attempts: 1, - reconnect_delay: 0, - reconnect_delay_max: 0.5, - inherit_socket: false, - logger: nil, - sentinels: nil, - role: nil + class Client < ::RedisClient + ERROR_MAPPING = { + RedisClient::ConnectionError => Redis::ConnectionError, + RedisClient::CommandError => Redis::CommandError, + RedisClient::ReadTimeoutError => Redis::TimeoutError, + RedisClient::CannotConnectError => Redis::CannotConnectError, + RedisClient::AuthenticationError => Redis::CannotConnectError, + RedisClient::PermissionError => Redis::PermissionError, + RedisClient::WrongTypeError => Redis::WrongTypeError, + RedisClient::RESP3::UnknownType => Redis::ProtocolError, }.freeze - attr_reader :options, :connection, :command_map - - def scheme - @options[:scheme] - end - - def host - @options[:host] - end - - def port - @options[:port] - end - - def path - @options[:path] + class << self + def config(**kwargs) + ::RedisClient.config(protocol: 2, **kwargs) + end end - def read_timeout - @options[:read_timeout] + def initialize(*) + super + @inherit_socket = false + @pid = Process.pid end + ruby2_keywords :initialize if respond_to?(:ruby2_keywords, true) - def connect_timeout - @options[:connect_timeout] + def server_url + config.server_url end def timeout - @options[:read_timeout] - end - - def username - @options[:username] - end - - def password - @options[:password] + config.read_timeout end def db - @options[:db] - end - - def db=(db) - @options[:db] = db.to_i - end - - def driver - @options[:driver] - end - - def inherit_socket? - @options[:inherit_socket] - end - - attr_accessor :logger - - def initialize(options = {}) - @options = _parse_options(options) - @reconnect = true - @logger = @options[:logger] - @connection = nil - @command_map = {} - - @pending_reads = 0 - - @connector = - if !@options[:sentinels].nil? - Connector::Sentinel.new(@options) - elsif options.include?(:connector) && options[:connector].respond_to?(:new) - options.delete(:connector).new(@options) - else - Connector.new(@options) - end - end - - def connect - @pid = Process.pid - - # Don't try to reconnect when the connection is fresh - with_reconnect(false) do - establish_connection - if password - if username - begin - call [:auth, username, password] - rescue CommandError => err # Likely on Redis < 6 - case err.message - when /ERR wrong number of arguments for 'auth' command/ - call [:auth, password] - when /WRONGPASS invalid username-password pair/ - begin - call [:auth, password] - rescue CommandError - raise err - end - ::Redis.deprecate!( - "[redis-rb] The Redis connection was configured with username #{username.inspect}, but" \ - " the provided password was for the default user. This will start failing in redis-rb 5.0.0." - ) - else - raise - end - end - else - call [:auth, password] - end - end - - call [:readonly] if @options[:readonly] - call [:select, db] if db != 0 - call [:client, :setname, @options[:id]] if @options[:id] - @connector.check(self) - end - - self - end - - def id - @options[:id] || "#{@options[:ssl] ? 'rediss' : @options[:scheme]}://#{location}/#{db}" - end - - def location - path || "#{host}:#{port}" - end - - def call(command) - reply = process([command]) { read } - raise reply if reply.is_a?(CommandError) - - if block_given? && reply != 'QUEUED' - yield reply - else - reply - end - end - - def call_loop(command, timeout = 0) - error = nil - - result = with_socket_timeout(timeout) do - process([command]) do - loop do - reply = read - if reply.is_a?(CommandError) - error = reply - break - else - yield reply - end - end - end - end - - # Raise error when previous block broke out of the loop. - raise error if error - - # Result is set to the value that the provided block used to break. - result - end - - def call_pipeline(pipeline) - return [] if pipeline.futures.empty? - - with_reconnect pipeline.with_reconnect? do - pipeline.finish(call_pipelined(pipeline)).tap do - self.db = pipeline.db if pipeline.db - end - rescue ConnectionError => e - return nil if pipeline.shutdown? - - # Assume the pipeline was sent in one piece, but execution of - # SHUTDOWN caused none of the replies for commands that were executed - # prior to it from coming back around. - raise e - end - end - - def call_pipelined(pipeline) - return [] if pipeline.futures.empty? - - # The method #ensure_connected (called from #process) reconnects once on - # I/O errors. To make an effort in making sure that commands are not - # executed more than once, only allow reconnection before the first reply - # has been read. When an error occurs after the first reply has been - # read, retrying would re-execute the entire pipeline, thus re-issuing - # already successfully executed commands. To circumvent this, don't retry - # after the first reply has been read successfully. - - commands = pipeline.commands - - result = Array.new(commands.size) - reconnect = @reconnect - - begin - exception = nil - - process(commands) do - pipeline.timeouts.each_with_index do |timeout, i| - reply = if timeout - with_socket_timeout(timeout) { read } - else - read - end - result[i] = reply - @reconnect = false - exception = reply if exception.nil? && reply.is_a?(CommandError) - end - end - - raise exception if exception - ensure - @reconnect = reconnect - end - - result - end - - def call_with_timeout(command, extra_timeout, &blk) - timeout = extra_timeout == 0 ? 0 : self.timeout + extra_timeout - with_socket_timeout(timeout) do - call(command, &blk) - end - rescue ConnectionError - retry - end - - def call_without_timeout(command, &blk) - call_with_timeout(command, 0, &blk) - end - - def process(commands) - logging(commands) do - ensure_connected do - commands.each do |command| - if command_map[command.first] - command = command.dup - command[0] = command_map[command.first] - end - - write(command) - end - - yield if block_given? - end - end - end - - def connected? - !!(connection && connection.connected?) - end - - def disconnect - connection.disconnect if connected? - end - alias close disconnect - - def reconnect - disconnect - connect + config.db end - def io - yield - rescue TimeoutError => e1 - # Add a message to the exception without destroying the original stack - e2 = TimeoutError.new("Connection timed out") - e2.set_backtrace(e1.backtrace) - raise e2 - rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EBADF, Errno::EINVAL, EOFError => e - raise ConnectionError, "Connection lost (%s)" % [e.class.name.split("::").last] - end - - def read - io do - value = connection.read - @pending_reads -= 1 - value - end + def host + config.host unless config.path end - def write(command) - io do - @pending_reads += 1 - connection.write(command) - end + def port + config.port unless config.path end - def with_socket_timeout(timeout) - connect unless connected? - original = @options[:read_timeout] - - begin - connection.timeout = timeout - @options[:read_timeout] = timeout # for reconnection - yield - ensure - connection.timeout = self.timeout if connected? - @options[:read_timeout] = original - end + def path + config.path end - def without_socket_timeout(&blk) - with_socket_timeout(0, &blk) + def username + config.username end - def with_reconnect(val = true) - original, @reconnect = @reconnect, val - yield - ensure - @reconnect = original + def password + config.password end - def without_reconnect(&blk) - with_reconnect(false, &blk) + def call(command, &block) + super(*command, &block) + rescue ::RedisClient::Error => error + raise ERROR_MAPPING.fetch(error.class), error.message, error.backtrace end - protected - - def logging(commands) - return yield unless @logger&.debug? - - begin - commands.each do |name, *args| - logged_args = args.map do |a| - if a.respond_to?(:inspect) then a.inspect - elsif a.respond_to?(:to_s) then a.to_s - else - # handle poorly-behaved descendants of BasicObject - klass = a.instance_exec { (class << self; self end).superclass } - "\#<#{klass}:#{a.__id__}>" - end - end - @logger.debug("[Redis] command=#{name.to_s.upcase} args=#{logged_args.join(' ')}") - end - - t1 = Time.now - yield - ensure - @logger.debug("[Redis] call_time=%0.2f ms" % ((Time.now - t1) * 1000)) if t1 - end + def multi + super + rescue ::RedisClient::Error => error + raise ERROR_MAPPING.fetch(error.class), error.message, error.backtrace end - def establish_connection - server = @connector.resolve.dup - - @options[:host] = server[:host] - @options[:port] = Integer(server[:port]) if server.include?(:port) - - @connection = @options[:driver].connect(@options) - @pending_reads = 0 - rescue TimeoutError, - SocketError, - Errno::EADDRNOTAVAIL, - Errno::ECONNREFUSED, - Errno::EHOSTDOWN, - Errno::EHOSTUNREACH, - Errno::ENETUNREACH, - Errno::ENOENT, - Errno::ETIMEDOUT, - Errno::EINVAL => error - - raise CannotConnectError, "Error connecting to Redis on #{location} (#{error.class})" + def pipelined + super + rescue ::RedisClient::Error => error + raise ERROR_MAPPING.fetch(error.class), error.message, error.backtrace end - def ensure_connected - disconnect if @pending_reads > 0 - - attempts = 0 - - begin - attempts += 1 - - if connected? - unless inherit_socket? || Process.pid == @pid - raise InheritedError, - "Tried to use a connection from a child process without reconnecting. " \ - "You need to reconnect to Redis after forking " \ - "or set :inherit_socket to true." - end - else - connect - end - - yield - rescue BaseConnectionError - disconnect - - if attempts <= @options[:reconnect_attempts] && @reconnect - sleep_t = [(@options[:reconnect_delay] * 2**(attempts - 1)), - @options[:reconnect_delay_max]].min - - Kernel.sleep(sleep_t) - retry - else - raise - end - rescue Exception - disconnect - raise - end + def blocking_call(timeout, command, &block) + timeout += self.timeout if timeout && timeout > 0 + super(timeout, *command, &block) + rescue ::RedisClient::Error => error + raise ERROR_MAPPING.fetch(error.class), error.message, error.backtrace end - def _parse_options(options) - return options if options[:_parsed] - - defaults = DEFAULTS.dup - options = options.dup - - defaults.each_key do |key| - # Fill in defaults if needed - defaults[key] = defaults[key].call if defaults[key].respond_to?(:call) - - # Symbolize only keys that are needed - options[key] = options[key.to_s] if options.key?(key.to_s) - end - - url = options[:url] - url = defaults[:url] if url.nil? - - # Override defaults from URL if given - if url - require "uri" - - uri = URI(url) - - case uri.scheme - when "unix" - defaults[:path] = uri.path - when "redis", "rediss" - defaults[:scheme] = uri.scheme - defaults[:host] = uri.host.sub(/\A\[(.*)\]\z/, '\1') if uri.host - defaults[:port] = uri.port if uri.port - defaults[:username] = CGI.unescape(uri.user) if uri.user && !uri.user.empty? - defaults[:password] = CGI.unescape(uri.password) if uri.password && !uri.password.empty? - defaults[:db] = uri.path[1..-1].to_i if uri.path - defaults[:role] = :master - else - raise ArgumentError, "invalid uri scheme '#{uri.scheme}'" - end - - defaults[:ssl] = true if uri.scheme == "rediss" - end - - # Use default when option is not specified or nil - defaults.each_key do |key| - options[key] = defaults[key] if options[key].nil? - end - - if options[:path] - # Unix socket - options[:scheme] = "unix" - options.delete(:host) - options.delete(:port) - else - # TCP socket - options[:host] = options[:host].to_s - options[:port] = options[:port].to_i - end - - if options.key?(:timeout) - options[:connect_timeout] ||= options[:timeout] - options[:read_timeout] ||= options[:timeout] - options[:write_timeout] ||= options[:timeout] - end - - options[:connect_timeout] = Float(options[:connect_timeout]) - options[:read_timeout] = Float(options[:read_timeout]) - options[:write_timeout] = Float(options[:write_timeout]) - - options[:reconnect_attempts] = options[:reconnect_attempts].to_i - options[:reconnect_delay] = options[:reconnect_delay].to_f - options[:reconnect_delay_max] = options[:reconnect_delay_max].to_f - - options[:db] = options[:db].to_i - options[:driver] = _parse_driver(options[:driver]) || Connection.drivers.last - - case options[:tcp_keepalive] - when Hash - %i[time intvl probes].each do |key| - unless options[:tcp_keepalive][key].is_a?(Integer) - raise "Expected the #{key.inspect} key in :tcp_keepalive to be an Integer" - end - end - - when Integer - if options[:tcp_keepalive] >= 60 - options[:tcp_keepalive] = { time: options[:tcp_keepalive] - 20, intvl: 10, probes: 2 } - - elsif options[:tcp_keepalive] >= 30 - options[:tcp_keepalive] = { time: options[:tcp_keepalive] - 10, intvl: 5, probes: 2 } - - elsif options[:tcp_keepalive] >= 5 - options[:tcp_keepalive] = { time: options[:tcp_keepalive] - 2, intvl: 2, probes: 1 } - end - end - - options[:_parsed] = true - - options + def disable_reconnection(&block) + ensure_connected(retryable: false, &block) end - def _parse_driver(driver) - driver = driver.to_s if driver.is_a?(Symbol) - - if driver.is_a?(String) - begin - require_relative "connection/#{driver}" - rescue LoadError, NameError - begin - require "redis/connection/#{driver}" - rescue LoadError, NameError => error - raise "Cannot load driver #{driver.inspect}: #{error.message}" - end - end - - driver = Connection.const_get(driver.capitalize) - end - - driver + def inherit_socket! + @inherit_socket = true end - class Connector - def initialize(options) - @options = options.dup - end + private - def resolve - @options + def ensure_connected(retryable: true) + unless @inherit_socket || Process.pid == @pid + raise InheritedError, + "Tried to use a connection from a child process without reconnecting. " \ + "You need to reconnect to Redis after forking " \ + "or set :inherit_socket to true." end - def check(client); end - - class Sentinel < Connector - def initialize(options) - super(options) - - @options[:db] = DEFAULTS.fetch(:db) - - @sentinels = @options.delete(:sentinels).dup - @role = (@options[:role] || "master").to_s - @master = @options[:host] - end - - def check(client) - # Check the instance is really of the role we are looking for. - # We can't assume the command is supported since it was introduced - # recently and this client should work with old stuff. - begin - role = client.call([:role])[0] - rescue Redis::CommandError - # Assume the test is passed if we can't get a reply from ROLE... - role = @role - end - - if role != @role - client.disconnect - raise ConnectionError, "Instance role mismatch. Expected #{@role}, got #{role}." - end - end - - def resolve - result = case @role - when "master" - resolve_master - when "slave" - resolve_slave - else - raise ArgumentError, "Unknown instance role #{@role}" - end - - result || (raise ConnectionError, "Unable to fetch #{@role} via Sentinel.") - end - - def sentinel_detect - @sentinels.each do |sentinel| - client = Client.new(@options.merge({ - host: sentinel[:host] || sentinel["host"], - port: sentinel[:port] || sentinel["port"], - username: sentinel[:username] || sentinel["username"], - password: sentinel[:password] || sentinel["password"], - reconnect_attempts: 0 - })) - - begin - if result = yield(client) - # This sentinel responded. Make sure we ask it first next time. - @sentinels.delete(sentinel) - @sentinels.unshift(sentinel) - - return result - end - rescue BaseConnectionError - ensure - client.disconnect - end - end - - raise CannotConnectError, "No sentinels available." - end - - def resolve_master - sentinel_detect do |client| - if reply = client.call(["sentinel", "get-master-addr-by-name", @master]) - { host: reply[0], port: reply[1] } - end - end - end - - def resolve_slave - sentinel_detect do |client| - if reply = client.call(["sentinel", "slaves", @master]) - slaves = reply.map { |s| s.each_slice(2).to_h } - slaves.each { |s| s['flags'] = s.fetch('flags').split(',') } - slaves.reject! { |s| s.fetch('flags').include?('s_down') } - - if slaves.empty? - raise CannotConnectError, 'No slaves available.' - else - slave = slaves.sample - { - host: slave.fetch('ip'), - port: slave.fetch('port') - } - end - end - end - end - end + super end end end diff --git a/lib/redis/cluster.rb b/lib/redis/cluster.rb index a71a45f1d..6bc60e2d6 100644 --- a/lib/redis/cluster.rb +++ b/lib/redis/cluster.rb @@ -1,16 +1,16 @@ # frozen_string_literal: true -require_relative 'errors' -require_relative 'client' -require_relative 'cluster/command' -require_relative 'cluster/command_loader' -require_relative 'cluster/key_slot_converter' -require_relative 'cluster/node' -require_relative 'cluster/node_key' -require_relative 'cluster/node_loader' -require_relative 'cluster/option' -require_relative 'cluster/slot' -require_relative 'cluster/slot_loader' +require 'redis/errors' +require 'redis/client' +require 'redis/cluster/command' +require 'redis/cluster/command_loader' +require 'redis/cluster/key_slot_converter' +require 'redis/cluster/node' +require 'redis/cluster/node_key' +require 'redis/cluster/node_loader' +require 'redis/cluster/option' +require 'redis/cluster/slot' +require 'redis/cluster/slot_loader' class Redis # Redis Cluster client @@ -31,14 +31,6 @@ def id @node.map(&:id).sort.join(' ') end - # db feature is disabled in cluster mode - def db - 0 - end - - # db feature is disabled in cluster mode - def db=(_db); end - def timeout @node.first.timeout end diff --git a/lib/redis/commands/connection.rb b/lib/redis/commands/connection.rb index c70e0d052..d579eab6d 100644 --- a/lib/redis/commands/connection.rb +++ b/lib/redis/commands/connection.rb @@ -34,10 +34,7 @@ def echo(value) # @param [Integer] db zero-based index of the DB to use (0 to 15) # @return [String] `OK` def select(db) - synchronize do |client| - client.db = db - client.call([:select, db]) - end + send_command([:select, db]) end # Close the connection. @@ -48,7 +45,7 @@ def quit client.call([:quit]) rescue ConnectionError ensure - client.disconnect + client.close end end end diff --git a/lib/redis/commands/hyper_log_log.rb b/lib/redis/commands/hyper_log_log.rb index 39e31c7da..c7834b710 100644 --- a/lib/redis/commands/hyper_log_log.rb +++ b/lib/redis/commands/hyper_log_log.rb @@ -20,7 +20,7 @@ def pfadd(key, member) # @param [String, Array] keys # @return [Integer] def pfcount(*keys) - send_command([:pfcount] + keys) + send_command([:pfcount] + keys.flatten(1)) end # Merge multiple HyperLogLog values into an unique value that will approximate the cardinality of the union of diff --git a/lib/redis/commands/server.rb b/lib/redis/commands/server.rb index ff2d5b49c..5dc855cf9 100644 --- a/lib/redis/commands/server.rb +++ b/lib/redis/commands/server.rb @@ -117,9 +117,13 @@ def lastsave # # @yield a block to be called for every line of output # @yieldparam [String] line timestamp and command that was executed - def monitor(&block) + def monitor synchronize do |client| - client.call_loop([:monitor], &block) + client = client.pubsub + client.call([:monitor]) + loop do + yield client.next_event + end end end @@ -133,7 +137,7 @@ def save # Synchronously save the dataset to disk and then shut down the server. def shutdown synchronize do |client| - client.with_reconnect(false) do + client.disable_reconnection do client.call([:shutdown]) rescue ConnectionError # This means Redis has probably exited. diff --git a/lib/redis/commands/transactions.rb b/lib/redis/commands/transactions.rb index 227a144d6..2b0e24580 100644 --- a/lib/redis/commands/transactions.rb +++ b/lib/redis/commands/transactions.rb @@ -5,44 +5,26 @@ module Commands module Transactions # Mark the start of a transaction block. # - # Passing a block is optional. - # # @example With a block # redis.multi do |multi| # multi.set("key", "value") # multi.incr("counter") # end # => ["OK", 6] # - # @example Without a block - # redis.multi - # # => "OK" - # redis.set("key", "value") - # # => "QUEUED" - # redis.incr("counter") - # # => "QUEUED" - # redis.exec - # # => ["OK", 6] - # # @yield [multi] the commands that are called inside this block are cached # and written to the server upon returning from it # @yieldparam [Redis] multi `self` # - # @return [String, Array<...>] - # - when a block is not given, `OK` - # - when a block is given, an array with replies + # @return [Array<...>] + # - an array with replies # # @see #watch # @see #unwatch - def multi # :nodoc: - if block_given? - synchronize do |client| - pipeline = Pipeline::Multi.new(client) - pipelined_connection = PipelinedConnection.new(pipeline) - yield pipelined_connection - client.call_pipeline(pipeline) + def multi + synchronize do |client| + client.multi do |raw_transaction| + yield MultiConnection.new(raw_transaction) end - else - send_command([:multi]) end end @@ -121,8 +103,6 @@ def exec # Discard all commands issued after MULTI. # - # Only call this method when `#multi` was called **without** a block. - # # @return [String] `"OK"` # # @see #multi diff --git a/lib/redis/connection.rb b/lib/redis/connection.rb deleted file mode 100644 index b90a92149..000000000 --- a/lib/redis/connection.rb +++ /dev/null @@ -1,11 +0,0 @@ -# frozen_string_literal: true - -require "redis/connection/registry" - -# If a connection driver was required before this file, the array -# Redis::Connection.drivers will contain one or more classes. The last driver -# in this array will be used as default driver. If this array is empty, we load -# the plain Ruby driver as our default. Another driver can be required at a -# later point in time, causing it to be the last element of the #drivers array -# and therefore be chosen by default. -require_relative "connection/ruby" if Redis::Connection.drivers.empty? diff --git a/lib/redis/connection/command_helper.rb b/lib/redis/connection/command_helper.rb deleted file mode 100644 index f14f90a87..000000000 --- a/lib/redis/connection/command_helper.rb +++ /dev/null @@ -1,41 +0,0 @@ -# frozen_string_literal: true - -class Redis - module Connection - module CommandHelper - COMMAND_DELIMITER = "\r\n" - - def build_command(args) - command = [nil] - - args.each do |i| - if i.is_a? Array - i.each do |j| - j = j.to_s - j = j.encoding == Encoding::BINARY ? j : j.b - command << "$#{j.bytesize}" - command << j - end - else - i = i.to_s - i = i.encoding == Encoding::BINARY ? i : i.b - command << "$#{i.bytesize}" - command << i - end - end - - command[0] = "*#{(command.length - 1) / 2}" - - # Trailing delimiter - command << "" - command.join(COMMAND_DELIMITER) - end - - protected - - def encode(string) - string.force_encoding(Encoding.default_external) - end - end - end -end diff --git a/lib/redis/connection/hiredis.rb b/lib/redis/connection/hiredis.rb deleted file mode 100644 index 2e0a47222..000000000 --- a/lib/redis/connection/hiredis.rb +++ /dev/null @@ -1,68 +0,0 @@ -# frozen_string_literal: true - -require "redis/connection/registry" -require "redis/errors" - -require "hiredis/connection" -require "timeout" - -class Redis - module Connection - class Hiredis - def self.connect(config) - connection = ::Hiredis::Connection.new - connect_timeout = (config.fetch(:connect_timeout, 0) * 1_000_000).to_i - - if config[:scheme] == "unix" - connection.connect_unix(config[:path], connect_timeout) - elsif config[:scheme] == "rediss" || config[:ssl] - raise NotImplementedError, "SSL not supported by hiredis driver" - else - connection.connect(config[:host], config[:port], connect_timeout) - end - - instance = new(connection) - instance.timeout = config[:read_timeout] - instance - rescue Errno::ETIMEDOUT - raise TimeoutError - end - - def initialize(connection) - @connection = connection - end - - def connected? - @connection&.connected? - end - - def timeout=(timeout) - # Hiredis works with microsecond timeouts - @connection.timeout = Integer(timeout * 1_000_000) - end - - def disconnect - @connection.disconnect - @connection = nil - end - - def write(command) - @connection.write(command.flatten(1)) - rescue Errno::EAGAIN - raise TimeoutError - end - - def read - reply = @connection.read - reply = CommandError.new(reply.message) if reply.is_a?(RuntimeError) - reply - rescue Errno::EAGAIN - raise TimeoutError - rescue RuntimeError => err - raise ProtocolError, err.message - end - end - end -end - -Redis::Connection.drivers << Redis::Connection::Hiredis diff --git a/lib/redis/connection/registry.rb b/lib/redis/connection/registry.rb deleted file mode 100644 index 26ff8335d..000000000 --- a/lib/redis/connection/registry.rb +++ /dev/null @@ -1,13 +0,0 @@ -# frozen_string_literal: true - -class Redis - module Connection - # Store a list of loaded connection drivers in the Connection module. - # Redis::Client uses the last required driver by default, and will be aware - # of the loaded connection drivers if the user chooses to override the - # default connection driver. - def self.drivers - @drivers ||= [] - end - end -end diff --git a/lib/redis/connection/ruby.rb b/lib/redis/connection/ruby.rb deleted file mode 100644 index f2e2f6470..000000000 --- a/lib/redis/connection/ruby.rb +++ /dev/null @@ -1,435 +0,0 @@ -# frozen_string_literal: true - -require "redis/connection/registry" -require "redis/connection/command_helper" -require "redis/errors" - -require "socket" -require "timeout" - -begin - require "openssl" -rescue LoadError - # Not all systems have OpenSSL support -end - -class Redis - module Connection - module SocketMixin - CRLF = "\r\n" - - def initialize(*args) - super(*args) - - @timeout = @write_timeout = nil - @buffer = "".b - end - - def timeout=(timeout) - @timeout = (timeout if timeout && timeout > 0) - end - - def write_timeout=(timeout) - @write_timeout = (timeout if timeout && timeout > 0) - end - - def read(nbytes) - result = @buffer.slice!(0, nbytes) - - buffer = String.new(capacity: nbytes, encoding: Encoding::ASCII_8BIT) - result << _read_from_socket(nbytes - result.bytesize, buffer) while result.bytesize < nbytes - - result - end - - def gets - while (crlf = @buffer.index(CRLF)).nil? - @buffer << _read_from_socket(16_384) - end - - @buffer.slice!(0, crlf + CRLF.bytesize) - end - - def _read_from_socket(nbytes, buffer = nil) - loop do - case chunk = read_nonblock(nbytes, buffer, exception: false) - when :wait_readable - unless wait_readable(@timeout) - raise Redis::TimeoutError - end - when :wait_writable - unless wait_writable(@timeout) - raise Redis::TimeoutError - end - when nil - raise Errno::ECONNRESET - when String - return chunk - end - end - end - - def write(buffer) - return super(buffer) unless @write_timeout - - bytes_to_write = buffer.bytesize - total_bytes_written = 0 - loop do - case bytes_written = write_nonblock(buffer, exception: false) - when :wait_readable - unless wait_readable(@write_timeout) - raise Redis::TimeoutError - end - when :wait_writable - unless wait_writable(@write_timeout) - raise Redis::TimeoutError - end - when nil - raise Errno::ECONNRESET - when Integer - total_bytes_written += bytes_written - - if total_bytes_written >= bytes_to_write - return total_bytes_written - end - - buffer = buffer.byteslice(bytes_written..-1) - end - end - end - end - - if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby" - - require "timeout" - - class TCPSocket < ::TCPSocket - include SocketMixin - - def self.connect(host, port, timeout) - Timeout.timeout(timeout) do - sock = new(host, port) - sock - end - rescue Timeout::Error - raise TimeoutError - end - end - - if defined?(::UNIXSocket) - - class UNIXSocket < ::UNIXSocket - include SocketMixin - - def self.connect(path, timeout) - Timeout.timeout(timeout) do - sock = new(path) - sock - end - rescue Timeout::Error - raise TimeoutError - end - - # JRuby raises Errno::EAGAIN on #read_nonblock even when it - # says it is readable (1.6.6, in both 1.8 and 1.9 mode). - # Use the blocking #readpartial method instead. - - def _read_from_socket(nbytes, _buffer = nil) - # JRuby: Throw away the buffer as we won't need it - # but still need to support the max arity of 2 - readpartial(nbytes) - rescue EOFError - raise Errno::ECONNRESET - end - end - - end - - else - - class TCPSocket < ::Socket - include SocketMixin - - def self.connect_addrinfo(addrinfo, port, timeout) - sock = new(::Socket.const_get(addrinfo[0]), Socket::SOCK_STREAM, 0) - sockaddr = ::Socket.pack_sockaddr_in(port, addrinfo[3]) - - begin - sock.connect_nonblock(sockaddr) - rescue Errno::EINPROGRESS - raise TimeoutError unless sock.wait_writable(timeout) - - begin - sock.connect_nonblock(sockaddr) - rescue Errno::EISCONN - end - end - - sock - end - - def self.connect(host, port, timeout) - # Don't pass AI_ADDRCONFIG as flag to getaddrinfo(3) - # - # From the man page for getaddrinfo(3): - # - # If hints.ai_flags includes the AI_ADDRCONFIG flag, then IPv4 - # addresses are returned in the list pointed to by res only if the - # local system has at least one IPv4 address configured, and IPv6 - # addresses are returned only if the local system has at least one - # IPv6 address configured. The loopback address is not considered - # for this case as valid as a configured address. - # - # We do want the IPv6 loopback address to be returned if applicable, - # even if it is the only configured IPv6 address on the machine. - # Also see: https://github.com/redis/redis-rb/pull/394. - addrinfo = ::Socket.getaddrinfo(host, nil, Socket::AF_UNSPEC, Socket::SOCK_STREAM) - - # From the man page for getaddrinfo(3): - # - # Normally, the application should try using the addresses in the - # order in which they are returned. The sorting function used - # within getaddrinfo() is defined in RFC 3484 [...]. - # - addrinfo.each_with_index do |ai, i| - return connect_addrinfo(ai, port, timeout) - rescue SystemCallError - # Raise if this was our last attempt. - raise if addrinfo.length == i + 1 - end - end - end - - class UNIXSocket < ::Socket - include SocketMixin - - def self.connect(path, timeout) - sock = new(::Socket::AF_UNIX, Socket::SOCK_STREAM, 0) - sockaddr = ::Socket.pack_sockaddr_un(path) - - begin - sock.connect_nonblock(sockaddr) - rescue Errno::EINPROGRESS - raise TimeoutError unless sock.wait_writable(timeout) - - begin - sock.connect_nonblock(sockaddr) - rescue Errno::EISCONN - end - end - - sock - end - end - - end - - if defined?(OpenSSL) - class SSLSocket < ::OpenSSL::SSL::SSLSocket - include SocketMixin - - unless method_defined?(:wait_readable) - def wait_readable(timeout = nil) - to_io.wait_readable(timeout) - end - end - - unless method_defined?(:wait_writable) - def wait_writable(timeout = nil) - to_io.wait_writable(timeout) - end - end - - def self.connect(host, port, timeout, ssl_params) - # NOTE: this is using Redis::Connection::TCPSocket - tcp_sock = TCPSocket.connect(host, port, timeout) - - ctx = OpenSSL::SSL::SSLContext.new - - # The provided parameters are merged into OpenSSL::SSL::SSLContext::DEFAULT_PARAMS - ctx.set_params(ssl_params || {}) - - ssl_sock = new(tcp_sock, ctx) - ssl_sock.hostname = host - - begin - # Initiate the socket connection in the background. If it doesn't fail - # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) - # indicating the connection is in progress. - # Unlike waiting for a tcp socket to connect, you can't time out ssl socket - # connections during the connect phase properly, because IO.select only partially works. - # Instead, you have to retry. - ssl_sock.connect_nonblock - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, IO::WaitReadable - if ssl_sock.wait_readable(timeout) - retry - else - raise TimeoutError - end - rescue IO::WaitWritable - if ssl_sock.wait_writable(timeout) - retry - else - raise TimeoutError - end - end - - unless ctx.verify_mode == OpenSSL::SSL::VERIFY_NONE || ( - ctx.respond_to?(:verify_hostname) && - !ctx.verify_hostname - ) - ssl_sock.post_connection_check(host) - end - - ssl_sock - end - end - end - - class Ruby - include Redis::Connection::CommandHelper - - MINUS = "-" - PLUS = "+" - COLON = ":" - DOLLAR = "$" - ASTERISK = "*" - - def self.connect(config) - if config[:scheme] == "unix" - raise ArgumentError, "SSL incompatible with unix sockets" if config[:ssl] - - sock = UNIXSocket.connect(config[:path], config[:connect_timeout]) - elsif config[:scheme] == "rediss" || config[:ssl] - sock = SSLSocket.connect(config[:host], config[:port], config[:connect_timeout], config[:ssl_params]) - else - sock = TCPSocket.connect(config[:host], config[:port], config[:connect_timeout]) - end - - instance = new(sock) - instance.timeout = config[:read_timeout] - instance.write_timeout = config[:write_timeout] - instance.set_tcp_keepalive config[:tcp_keepalive] - instance.set_tcp_nodelay if sock.is_a? TCPSocket - instance - end - - if %i[SOL_SOCKET SO_KEEPALIVE SOL_TCP TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT].all? { |c| Socket.const_defined? c } - def set_tcp_keepalive(keepalive) - return unless keepalive.is_a?(Hash) - - @sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) - @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, keepalive[:time]) - @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, keepalive[:intvl]) - @sock.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, keepalive[:probes]) - end - - def get_tcp_keepalive - { - time: @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE).int, - intvl: @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL).int, - probes: @sock.getsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT).int - } - end - else - def set_tcp_keepalive(keepalive); end - - def get_tcp_keepalive - { - } - end - end - - # disables Nagle's Algorithm, prevents multiple round trips with MULTI - if %i[IPPROTO_TCP TCP_NODELAY].all? { |c| Socket.const_defined? c } - def set_tcp_nodelay - @sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - end - else - def set_tcp_nodelay; end - end - - def initialize(sock) - @sock = sock - end - - def connected? - !!@sock - end - - def disconnect - @sock.close - rescue - ensure - @sock = nil - end - - def timeout=(timeout) - @sock.timeout = timeout if @sock.respond_to?(:timeout=) - end - - def write_timeout=(timeout) - @sock.write_timeout = timeout - end - - def write(command) - @sock.write(build_command(command)) - end - - def read - line = @sock.gets - reply_type = line.slice!(0, 1) - format_reply(reply_type, line) - rescue Errno::EAGAIN - raise TimeoutError - rescue OpenSSL::SSL::SSLError => ssl_error - if ssl_error.message.match?(/SSL_read: unexpected eof while reading/i) - raise EOFError, ssl_error.message - else - raise - end - end - - def format_reply(reply_type, line) - case reply_type - when MINUS then format_error_reply(line) - when PLUS then format_status_reply(line) - when COLON then format_integer_reply(line) - when DOLLAR then format_bulk_reply(line) - when ASTERISK then format_multi_bulk_reply(line) - else raise ProtocolError, reply_type - end - end - - def format_error_reply(line) - CommandError.new(line.strip) - end - - def format_status_reply(line) - line.strip - end - - def format_integer_reply(line) - line.to_i - end - - def format_bulk_reply(line) - bulklen = line.to_i - return if bulklen == -1 - - reply = encode(@sock.read(bulklen)) - @sock.read(2) # Discard CRLF. - reply - end - - def format_multi_bulk_reply(line) - n = line.to_i - return if n == -1 - - Array.new(n) { read } - end - end - end -end - -Redis::Connection.drivers << Redis::Connection::Ruby diff --git a/lib/redis/distributed.rb b/lib/redis/distributed.rb index 6e09725aa..14f37f50c 100644 --- a/lib/redis/distributed.rb +++ b/lib/redis/distributed.rb @@ -20,7 +20,7 @@ def message def initialize(node_configs, options = {}) @tag = options[:tag] || /^\{(.+?)\}/ @ring = options[:ring] || HashRing.new - @node_configs = node_configs.dup + @node_configs = node_configs.map(&:dup) @default_options = options.dup node_configs.each { |node_config| add_node(node_config) } @subscribed_node = nil @@ -41,6 +41,8 @@ def nodes def add_node(options) options = { url: options } if options.is_a?(String) options = @default_options.merge(options) + options.delete(:tag) + options.delete(:ring) @ring.add_node Redis.new(options) end @@ -908,9 +910,7 @@ def pipelined def multi(&block) raise CannotDistribute, :multi unless @watch_key - result = node_for(@watch_key).multi(&block) - @watch_key = nil if block_given? - result + node_for(@watch_key).multi(&block) end # Execute all commands issued after MULTI. diff --git a/lib/redis/errors.rb b/lib/redis/errors.rb index 74a772713..1af642956 100644 --- a/lib/redis/errors.rb +++ b/lib/redis/errors.rb @@ -20,6 +20,12 @@ def initialize(reply_type) class CommandError < BaseError end + class PermissionError < CommandError + end + + class WrongTypeError < CommandError + end + # Base error for connection related errors. class BaseConnectionError < BaseError end diff --git a/lib/redis/pipeline.rb b/lib/redis/pipeline.rb index eebb31318..9f77e70a1 100644 --- a/lib/redis/pipeline.rb +++ b/lib/redis/pipeline.rb @@ -4,27 +4,30 @@ class Redis class PipelinedConnection - def initialize(pipeline) + attr_accessor :db + + def initialize(pipeline, futures = []) @pipeline = pipeline + @futures = futures end include Commands - def db - @pipeline.db - end - - def db=(db) - @pipeline.db = db - end - def pipelined yield self end - def call_pipeline(pipeline) - @pipeline.call_pipeline(pipeline) - nil + def multi + transaction = MultiConnection.new(@pipeline, @futures) + send_command([:multi]) + size = @futures.size + yield transaction + multi_future = MultiFuture.new(@futures[size..-1]) + @pipeline.call(:exec) do |result| + multi_future._set(result) + end + @futures << multi_future + multi_future end private @@ -34,161 +37,39 @@ def synchronize end def send_command(command, &block) - @pipeline.call(command, &block) + future = Future.new(command, block) + @pipeline.call(*command) do |result| + future._set(result) + end + @futures << future + future end def send_blocking_command(command, timeout, &block) - @pipeline.call_with_timeout(command, timeout, &block) - end - end - - class Pipeline - REDIS_INTERNAL_PATH = File.expand_path("..", __dir__).freeze - # Redis use MonitorMixin#synchronize and this class use DelegateClass which we want to filter out. - # Both are in the stdlib so we can simply filter the entire stdlib out. - STDLIB_PATH = File.expand_path("..", MonitorMixin.instance_method(:synchronize).source_location.first).freeze - - attr_accessor :db - attr_reader :client - - attr :futures - alias materialized_futures futures - - def initialize(client) - @client = client.is_a?(Pipeline) ? client.client : client - @with_reconnect = true - @shutdown = false - @futures = [] - end - - def timeout - client.timeout - end - - def with_reconnect? - @with_reconnect - end - - def without_reconnect? - !@with_reconnect - end - - def shutdown? - @shutdown - end - - def empty? - @futures.empty? - end - - def call(command, timeout: nil, &block) - # A pipeline that contains a shutdown should not raise ECONNRESET when - # the connection is gone. - @shutdown = true if command.first == :shutdown - future = Future.new(command, block, timeout) + future = Future.new(command, block) + @pipeline.blocking_call(timeout, *command) do |result| + future._set(result) + end @futures << future future end + end - def call_with_timeout(command, timeout, &block) - call(command, timeout: timeout, &block) - end - - def call_pipeline(pipeline) - @shutdown = true if pipeline.shutdown? - @futures.concat(pipeline.materialized_futures) - @db = pipeline.db - nil - end - - def commands - @futures.map(&:_command) - end - - def timeouts - @futures.map(&:timeout) - end - - def with_reconnect(val = true) - @with_reconnect = false unless val - yield - end - - def without_reconnect(&blk) - with_reconnect(false, &blk) - end - - def finish(replies, &blk) - if blk - futures.each_with_index.map do |future, i| - future._set(blk.call(replies[i])) - end - else - futures.each_with_index.map do |future, i| - future._set(replies[i]) - end - end + class MultiConnection < PipelinedConnection + def multi + raise Redis::Error, "Can't nest multi transaction" end - class Multi < self - def finish(replies) - exec = replies.last - - return if exec.nil? # The transaction failed because of WATCH. - - # EXEC command failed. - raise exec if exec.is_a?(CommandError) - - if exec.size < futures.size - # Some command wasn't recognized by Redis. - command_error = replies.detect { |r| r.is_a?(CommandError) } - raise command_error - end - - super(exec) do |reply| - # Because an EXEC returns nested replies, hiredis won't be able to - # convert an error reply to a CommandError instance itself. This is - # specific to MULTI/EXEC, so we solve this here. - reply.is_a?(::RuntimeError) ? CommandError.new(reply.message) : reply - end - end - - def materialized_futures - if empty? - [] - else - [ - Future.new([:multi], nil, 0), - *futures, - MultiFuture.new(futures) - ] - end - end - - def timeouts - if empty? - [] - else - [nil, *super, nil] - end - end + private - def commands - if empty? - [] - else - [[:multi]] + super + [[:exec]] - end - end + # Blocking commands inside transaction behave like non-blocking. + # It shouldn't be done though. + # https://redis.io/commands/blpop/#blpop-inside-a-multi--exec-transaction + def send_blocking_command(command, _timeout, &block) + send_command(command, &block) end end - class DeprecatedPipeline < DelegateClass(Pipeline) - end - - class DeprecatedMulti < DelegateClass(Pipeline::Multi) - end - class FutureNotReady < RuntimeError def initialize super("Value will be available once the pipeline executes.") @@ -198,13 +79,10 @@ def initialize class Future < BasicObject FutureNotReady = ::Redis::FutureNotReady.new - attr_reader :timeout - - def initialize(command, transformation, timeout) + def initialize(command, coerce) @command = command - @transformation = transformation - @timeout = timeout @object = FutureNotReady + @coerce = coerce end def ==(_other) @@ -222,16 +100,12 @@ def inspect end def _set(object) - @object = @transformation ? @transformation.call(object) : object + @object = @coerce ? @coerce.call(object) : object value end - def _command - @command - end - def value - ::Kernel.raise(@object) if @object.is_a?(::RuntimeError) + ::Kernel.raise(@object) if @object.is_a?(::StandardError) @object end @@ -248,13 +122,16 @@ class MultiFuture < Future def initialize(futures) @futures = futures @command = [:exec] + @object = FutureNotReady end def _set(replies) - @futures.each_with_index do |future, index| - future._set(replies[index]) + if replies + @futures.each_with_index do |future, index| + future._set(replies[index]) + end end - replies + @object = replies end end end diff --git a/lib/redis/subscribe.rb b/lib/redis/subscribe.rb index 5573b98f1..e1944f593 100644 --- a/lib/redis/subscribe.rb +++ b/lib/redis/subscribe.rb @@ -7,7 +7,7 @@ def initialize(client) end def call(command) - @client.process([command]) + @client.call(command) end def subscribe(*channels, &block) @@ -39,13 +39,15 @@ def punsubscribe(*channels) def subscription(start, stop, channels, block, timeout = 0) sub = Subscription.new(&block) - unsubscribed = false + @client.call([start, *channels]) + while event = @client.next_event(timeout) + if event.is_a?(::RedisClient::CommandError) + raise Client::ERROR_MAPPING.fetch(event.class), event.message + end - @client.call_loop([start, *channels], timeout) do |line| - type, *rest = line + type, *rest = event sub.callbacks[type].call(*rest) - unsubscribed = type == stop && rest.last == 0 - break if unsubscribed + break if type == stop && rest.last == 0 end # No need to unsubscribe here. The real client closes the connection # whenever an exception is raised (see #ensure_connected). diff --git a/redis.gemspec b/redis.gemspec index 519d248b8..2747c1e00 100644 --- a/redis.gemspec +++ b/redis.gemspec @@ -44,4 +44,6 @@ Gem::Specification.new do |s| s.executables = `git ls-files -- exe/*`.split("\n").map { |f| File.basename(f) } s.required_ruby_version = '>= 2.5.0' + + s.add_runtime_dependency('redis-client') end diff --git a/test/distributed/commands_requiring_clustering_test.rb b/test/distributed/commands_requiring_clustering_test.rb index 9f9aa82d5..db9002523 100644 --- a/test/distributed/commands_requiring_clustering_test.rb +++ b/test/distributed/commands_requiring_clustering_test.rb @@ -169,6 +169,6 @@ def test_bitop r.bitop(:xor, "{qux}foo^bar", "{qux}foo", "{qux}bar") assert_equal "\x03", r.get("{qux}foo^bar") r.bitop(:not, "{qux}~foo", "{qux}foo") - assert_equal "\x9E", r.get("{qux}~foo") + assert_equal "\x9E".b, r.get("{qux}~foo") end end diff --git a/test/distributed/distributed_test.rb b/test/distributed/distributed_test.rb index d03b52d60..8a5878498 100644 --- a/test/distributed/distributed_test.rb +++ b/test/distributed/distributed_test.rb @@ -21,15 +21,12 @@ def test_handle_multiple_servers end def test_add_nodes - logger = Logger.new("/dev/null") - - @r = Redis::Distributed.new NODES, logger: logger, timeout: 10 + @r = Redis::Distributed.new NODES, timeout: 10 assert_equal "127.0.0.1", @r.nodes[0]._client.host assert_equal PORT, @r.nodes[0]._client.port assert_equal 15, @r.nodes[0]._client.db assert_equal 10, @r.nodes[0]._client.timeout - assert_equal logger, @r.nodes[0]._client.logger @r.add_node("redis://127.0.0.1:6380/14") @@ -37,7 +34,6 @@ def test_add_nodes assert_equal 6380, @r.nodes[1]._client.port assert_equal 14, @r.nodes[1]._client.db assert_equal 10, @r.nodes[1]._client.timeout - assert_equal logger, @r.nodes[1]._client.logger end def test_pipelining_commands_cannot_be_distributed diff --git a/test/distributed/internals_test.rb b/test/distributed/internals_test.rb index ac3fad205..454828021 100644 --- a/test/distributed/internals_test.rb +++ b/test/distributed/internals_test.rb @@ -15,19 +15,19 @@ def test_provides_a_meaningful_inspect def test_default_as_urls nodes = ["redis://127.0.0.1:#{PORT}/15", *NODES] redis = Redis::Distributed.new nodes - assert_equal(["redis://127.0.0.1:#{PORT}/15", *NODES], redis.nodes.map { |node| node._client.id }) + assert_equal(["redis://127.0.0.1:#{PORT}/15", *NODES], redis.nodes.map { |node| node._client.server_url }) end def test_default_as_config_hashes nodes = [OPTIONS.merge(host: '127.0.0.1'), OPTIONS.merge(host: 'somehost', port: PORT.next)] redis = Redis::Distributed.new nodes - assert_equal(["redis://127.0.0.1:#{PORT}/15", "redis://somehost:#{PORT.next}/15"], redis.nodes.map { |node| node._client.id }) + assert_equal(["redis://127.0.0.1:#{PORT}/15", "redis://somehost:#{PORT.next}/15"], redis.nodes.map { |node| node._client.server_url }) end def test_as_mix_and_match nodes = ["redis://127.0.0.1:7389/15", OPTIONS.merge(host: 'somehost'), OPTIONS.merge(host: 'somehost', port: PORT.next)] redis = Redis::Distributed.new nodes - assert_equal(["redis://127.0.0.1:7389/15", "redis://somehost:#{PORT}/15", "redis://somehost:#{PORT.next}/15"], redis.nodes.map { |node| node._client.id }) + assert_equal(["redis://127.0.0.1:7389/15", "redis://somehost:#{PORT}/15", "redis://somehost:#{PORT.next}/15"], redis.nodes.map { |node| node._client.server_url }) end def test_override_id diff --git a/test/distributed/transactions_test.rb b/test/distributed/transactions_test.rb index 7cd8d5fb5..6be310134 100644 --- a/test/distributed/transactions_test.rb +++ b/test/distributed/transactions_test.rb @@ -5,22 +5,6 @@ class TestDistributedTransactions < Minitest::Test include Helper::Distributed - def test_multi_discard - r.set("foo", 1) - - r.watch("foo") - r.multi - r.set("foo", 2) - - assert_raises Redis::Distributed::CannotDistribute do - r.set("bar", 1) - end - - r.discard - - assert_equal('1', r.get("foo")) - end - def test_multi_discard_without_watch @foo = nil @@ -83,22 +67,4 @@ def test_watch_multi_with_block assert_equal [3, 6, 10], result end end - - def test_watch_multi_exec_without_block - r.set("{qux}baz", 1) - - assert_equal "OK", r.watch("{qux}foo", "{qux}bar", "{qux}baz") - assert_equal '1', r.get("{qux}baz") - - assert_raises Redis::Distributed::CannotDistribute do - r.get("{foo}baz") - end - - assert_equal "OK", r.multi - assert_equal "QUEUED", r.incrby("{qux}baz", 1) - assert_equal "QUEUED", r.incrby("{qux}baz", 1) - assert_equal [2, 3], r.exec - - assert_equal "OK", r.set("{other}baz", 1) - end end diff --git a/test/helper.rb b/test/helper.rb index 63a7f0ca6..802dce65c 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -13,7 +13,6 @@ Redis.silence_deprecations = true require "redis/distributed" -require "redis/connection/#{ENV['DRIVER']}" require_relative "support/redis_mock" require_relative 'support/cluster/orchestrator' @@ -109,7 +108,6 @@ module Generic alias r redis def setup - @log = StringIO.new @redis = init _new_client # Run GC to make sure orphaned connections are closed. @@ -212,7 +210,7 @@ module Client private def _format_options(options) - OPTIONS.merge(logger: ::Logger.new(@log)).merge(options) + OPTIONS.merge(options) end def _new_client(options = {}) @@ -231,7 +229,7 @@ module Sentinel LOCALHOST = '127.0.0.1' def build_sentinel_client(options = {}) - opts = { host: LOCALHOST, port: SENTINEL_PORT, timeout: TIMEOUT, logger: ::Logger.new(@log) } + opts = { host: LOCALHOST, port: SENTINEL_PORT, timeout: TIMEOUT } Redis.new(opts.merge(options)) end @@ -245,7 +243,7 @@ def _format_options(options = {}) { url: "redis://#{MASTER_NAME}", sentinels: [{ host: LOCALHOST, port: SENTINEL_PORT }], - role: :master, timeout: TIMEOUT, logger: ::Logger.new(@log) + role: :master, timeout: TIMEOUT, }.merge(options) end @@ -268,7 +266,6 @@ def version def _format_options(options) { timeout: OPTIONS[:timeout], - logger: ::Logger.new(@log) }.merge(options) end @@ -420,7 +417,6 @@ def _default_nodes(host: DEFAULT_HOST, ports: DEFAULT_PORTS) def _format_options(options) { timeout: OPTIONS[:timeout], - logger: ::Logger.new(@log), cluster: _default_nodes }.merge(options) end diff --git a/test/lint/authentication.rb b/test/lint/authentication.rb index 6dad12ef2..7f960e031 100644 --- a/test/lint/authentication.rb +++ b/test/lint/authentication.rb @@ -15,7 +15,7 @@ def test_auth_with_password def test_auth_for_acl target_version "6.0.0" do with_acl do |username, password| - assert_raises(Redis::BaseError) { redis.auth(username, 'wrongpassword') } + assert_raises(Redis::CannotConnectError) { redis.auth(username, 'wrongpassword') } assert_equal 'OK', redis.auth(username, password) assert_equal 'PONG', redis.ping assert_raises(Redis::BaseError) { redis.echo('foo') } diff --git a/test/lint/hashes.rb b/test/lint/hashes.rb index 3e9ff7de0..5e22c9e56 100644 --- a/test/lint/hashes.rb +++ b/test/lint/hashes.rb @@ -131,12 +131,12 @@ def test_hvals end def test_hgetall - assert(r.hgetall("foo") == {}) + assert_equal({}, r.hgetall("foo")) r.hset("foo", "f1", "s1") r.hset("foo", "f2", "s2") - assert(r.hgetall("foo") == { "f1" => "s1", "f2" => "s2" }) + assert_equal({ "f1" => "s1", "f2" => "s2" }, r.hgetall("foo")) end def test_hmset diff --git a/test/lint/streams.rb b/test/lint/streams.rb index 694f08c34..8bc761b1d 100644 --- a/test/lint/streams.rb +++ b/test/lint/streams.rb @@ -91,7 +91,7 @@ def test_xadd_with_maxlen_and_approximate_option end def test_xadd_with_invalid_arguments - assert_raises(Redis::CommandError) { redis.xadd(nil, {}) } + assert_raises(TypeError) { redis.xadd(nil, {}) } assert_raises(Redis::CommandError) { redis.xadd('', {}) } assert_raises(Redis::CommandError) { redis.xadd('s1', {}) } end @@ -146,8 +146,8 @@ def test_xdel_with_invalid_entry_ids end def test_xdel_with_invalid_arguments - assert_equal 0, redis.xdel(nil, nil) - assert_equal 0, redis.xdel(nil, [nil]) + assert_raises(TypeError) { redis.xdel(nil, nil) } + assert_raises(TypeError) { redis.xdel(nil, [nil]) } assert_equal 0, redis.xdel('', '') assert_equal 0, redis.xdel('', ['']) assert_raises(Redis::CommandError) { redis.xdel('s1', []) } @@ -222,7 +222,7 @@ def test_xrange_with_invalid_entry_id_options end def test_xrange_with_invalid_arguments - assert_equal([], redis.xrange(nil)) + assert_raises(TypeError) { redis.xrange(nil) } assert_equal([], redis.xrange('')) end @@ -298,7 +298,7 @@ def test_xrevrange_with_invalid_entry_id_options end def test_xrevrange_with_invalid_arguments - assert_equal([], redis.xrevrange(nil)) + assert_raises(TypeError) { redis.xrevrange(nil) } assert_equal([], redis.xrevrange('')) end @@ -313,7 +313,7 @@ def test_xlen_with_not_existed_key end def test_xlen_with_invalid_key - assert_equal 0, redis.xlen(nil) + assert_raises(TypeError) { redis.xlen(nil) } assert_equal 0, redis.xlen('') end @@ -369,7 +369,7 @@ def test_xread_does_not_raise_timeout_error_when_the_block_option_is_zero_msec end def test_xread_with_invalid_arguments - assert_raises(Redis::CommandError) { redis.xread(nil, nil) } + assert_raises(TypeError) { redis.xread(nil, nil) } assert_raises(Redis::CommandError) { redis.xread('', '') } assert_raises(Redis::CommandError) { redis.xread([], []) } assert_raises(Redis::CommandError) { redis.xread([''], ['']) } @@ -481,7 +481,7 @@ def test_xreadgroup_with_block_option end def test_xreadgroup_with_invalid_arguments - assert_raises(Redis::CommandError) { redis.xreadgroup(nil, nil, nil, nil) } + assert_raises(TypeError) { redis.xreadgroup(nil, nil, nil, nil) } assert_raises(Redis::CommandError) { redis.xreadgroup('', '', '', '') } assert_raises(Redis::CommandError) { redis.xreadgroup('', '', [], []) } assert_raises(Redis::CommandError) { redis.xreadgroup('', '', [''], ['']) } @@ -534,7 +534,7 @@ def test_xack_with_arrayed_entry_ids end def test_xack_with_invalid_arguments - assert_equal 0, redis.xack(nil, nil, nil) + assert_raises(TypeError) { redis.xack(nil, nil, nil) } assert_equal 0, redis.xack('', '', '') assert_raises(Redis::CommandError) { redis.xack('', '', []) } assert_equal 0, redis.xack('', '', ['']) @@ -641,7 +641,7 @@ def test_xclaim_with_justid_option end def test_xclaim_with_invalid_arguments - assert_raises(Redis::CommandError) { redis.xclaim(nil, nil, nil, nil, nil) } + assert_raises(TypeError) { redis.xclaim(nil, nil, nil, nil, nil) } assert_raises(Redis::CommandError) { redis.xclaim('', '', '', '', '') } end diff --git a/test/lint/strings.rb b/test/lint/strings.rb index 7e0a83fda..97adfa370 100644 --- a/test/lint/strings.rb +++ b/test/lint/strings.rb @@ -373,7 +373,7 @@ def test_bitop r.bitop(:xor, 'foo^bar{1}', 'foo{1}', 'bar{1}') assert_equal "\x03", r.get('foo^bar{1}') r.bitop(:not, '~foo{1}', 'foo{1}') - assert_equal "\x9E", r.get('~foo{1}') + assert_equal "\x9E".b, r.get('~foo{1}') end end end diff --git a/test/redis/blocking_commands_test.rb b/test/redis/blocking_commands_test.rb index 4d53c4f83..fe8a3f81b 100644 --- a/test/redis/blocking_commands_test.rb +++ b/test/redis/blocking_commands_test.rb @@ -47,6 +47,7 @@ def test_brpoplpush_disable_client_timeout end def test_brpoplpush_in_transaction + # TODO: redis-client transactions don't support blocking calls. results = r.multi do |transaction| transaction.brpoplpush('foo', 'bar') transaction.brpoplpush('foo', 'bar', timeout: 2) diff --git a/test/redis/client_test.rb b/test/redis/client_test.rb index c6f0030ac..68668593a 100644 --- a/test/redis/client_test.rb +++ b/test/redis/client_test.rb @@ -26,25 +26,9 @@ def test_call_raise end end - def test_client_with_custom_connector - custom_connector = Class.new(Redis::Client::Connector) do - def resolve - @options[:host] = '127.0.0.5' - @options[:port] = '999' - @options - end - end - - error = assert_raises do - new_redis = _new_client(connector: custom_connector) - new_redis.ping - end - assert_match(/Error connecting to Redis on 127\.0\.0\.5:999 (.+)/, error.message) - end - def test_mixed_encoding r.call("MSET", "fée", "\x00\xFF".b, "じ案".encode(Encoding::SHIFT_JIS), "\t".encode(Encoding::ASCII)) - assert_equal "\x00\xFF", r.call("GET", "fée") + assert_equal "\x00\xFF".b, r.call("GET", "fée") assert_equal "\t", r.call("GET", "じ案".encode(Encoding::SHIFT_JIS)) r.call("SET", "\x00\xFF", "fée") diff --git a/test/redis/command_map_test.rb b/test/redis/command_map_test.rb deleted file mode 100644 index 42917b5f6..000000000 --- a/test/redis/command_map_test.rb +++ /dev/null @@ -1,29 +0,0 @@ -# frozen_string_literal: true - -require "helper" - -class TestCommandMap < Minitest::Test - include Helper::Client - - def test_override_existing_commands - r.set("counter", 1) - - assert_equal 2, r.incr("counter") - - r._client.command_map[:incr] = :decr - - assert_equal 1, r.incr("counter") - end - - def test_override_non_existing_commands - r.set("key", "value") - - assert_raises Redis::CommandError do - r.idontexist("key") - end - - r._client.command_map[:idontexist] = :get - - assert_equal "value", r.idontexist("key") - end -end diff --git a/test/redis/connection_handling_test.rb b/test/redis/connection_handling_test.rb index 2a4e8b2a2..04946c8ea 100644 --- a/test/redis/connection_handling_test.rb +++ b/test/redis/connection_handling_test.rb @@ -17,7 +17,7 @@ def test_id assert_equal "PONG", redis.ping end - assert_equal ["setname", "client-name"], @name + assert_equal ["SETNAME", "client-name"], @name end def test_ping @@ -30,9 +30,9 @@ def test_select r.select 14 assert_nil r.get("foo") - r._client.disconnect + r._client.close - assert_nil r.get("foo") + assert_equal "bar", r.get("foo") end def test_quit @@ -156,25 +156,4 @@ def test_config_set ensure r.config :set, "timeout", 300 end - - driver(:ruby, :hiredis) do - def test_consistency_on_multithreaded_env - t = nil - - commands = { - set: ->(_key, _value) { t.kill; "+OK\r\n" }, - incr: ->(_key) { ":1\r\n" } - } - - redis_mock(commands) do |redis| - t = Thread.new do - redis.set("foo", "bar") - end - - t.join - - assert_equal 1, redis.incr("baz") - end - end - end end diff --git a/test/redis/connection_test.rb b/test/redis/connection_test.rb index d5172db72..7178c3ccc 100644 --- a/test/redis/connection_test.rb +++ b/test/redis/connection_test.rb @@ -6,7 +6,7 @@ class TestConnection < Minitest::Test include Helper::Client def test_provides_a_meaningful_inspect - assert_equal "#", r.inspect + assert_equal "#", r.inspect end def test_connection_with_user_and_password @@ -27,21 +27,12 @@ def test_connection_with_default_user_and_password end end - def test_connection_with_wrong_user_and_password - target_version "6.0" do - with_default_user_password do |_username, password| - redis = Redis.new(OPTIONS.merge(username: "does-not-exist", password: password)) - assert_equal "PONG", redis.ping - end - end - end - def test_connection_information - assert_equal "127.0.0.1", r.connection.fetch(:host) + assert_equal "localhost", r.connection.fetch(:host) assert_equal 6381, r.connection.fetch(:port) assert_equal 15, r.connection.fetch(:db) - assert_equal "127.0.0.1:6381", r.connection.fetch(:location) - assert_equal "redis://127.0.0.1:6381/15", r.connection.fetch(:id) + assert_equal "localhost:6381", r.connection.fetch(:location) + assert_equal "redis://localhost:6381/15", r.connection.fetch(:id) end def test_default_id_with_host_and_port @@ -55,18 +46,18 @@ def test_default_id_with_host_and_port_and_ssl end def test_default_id_with_host_and_port_and_explicit_scheme - redis = Redis.new(OPTIONS.merge(host: "host", port: "1234", db: 0, scheme: "foo")) - assert_equal "foo://host:1234/0", redis.connection.fetch(:id) + redis = Redis.new(OPTIONS.merge(host: "host", port: "1234", db: 0)) + assert_equal "redis://host:1234/0", redis.connection.fetch(:id) end def test_default_id_with_path redis = Redis.new(OPTIONS.merge(path: "/tmp/redis.sock", db: 0)) - assert_equal "unix:///tmp/redis.sock/0", redis.connection.fetch(:id) + assert_equal "/tmp/redis.sock/0", redis.connection.fetch(:id) end def test_default_id_with_path_and_explicit_scheme - redis = Redis.new(OPTIONS.merge(path: "/tmp/redis.sock", db: 0, scheme: "foo")) - assert_equal "unix:///tmp/redis.sock/0", redis.connection.fetch(:id) + redis = Redis.new(OPTIONS.merge(path: "/tmp/redis.sock", db: 0)) + assert_equal "/tmp/redis.sock/0", redis.connection.fetch(:id) end def test_override_id @@ -84,7 +75,7 @@ def test_id_inside_multi connection_id = redis.connection.fetch(:id) end - assert_equal "redis://127.0.0.1:6381/15", id - assert_equal "redis://127.0.0.1:6381/15", connection_id + assert_equal "redis://localhost:6381/15", id + assert_equal "redis://localhost:6381/15", connection_id end end diff --git a/test/redis/error_replies_test.rb b/test/redis/error_replies_test.rb index a8c3d0376..d7df0e862 100644 --- a/test/redis/error_replies_test.rb +++ b/test/redis/error_replies_test.rb @@ -38,15 +38,4 @@ def test_raise_first_error_reply_in_pipeline assert ex.message =~ /not an integer/i end end - - def test_recover_from_raise_in__call_loop - with_reconnection_check do - r._client.call_loop([:invalid_monitor]) do - assert false # Should never be executed - end - rescue => ex - ensure - assert ex.message =~ /unknown command/i - end - end end diff --git a/test/redis/fork_safety_test.rb b/test/redis/fork_safety_test.rb index 5bb08afcb..66326a649 100644 --- a/test/redis/fork_safety_test.rb +++ b/test/redis/fork_safety_test.rb @@ -5,6 +5,10 @@ class TestForkSafety < Minitest::Test include Helper::Client + def setup + skip("Fork unavailable") unless Process.respond_to?(:fork) + end + driver(:ruby, :hiredis) do def test_fork_safety redis = Redis.new(OPTIONS) @@ -26,8 +30,6 @@ def test_fork_safety assert_equal 127, status.exitstatus assert_equal "1", redis.get("foo") - rescue NotImplementedError => error - raise unless error.message =~ /fork is not available/ end def test_fork_safety_with_enabled_inherited_socket @@ -50,8 +52,6 @@ def test_fork_safety_with_enabled_inherited_socket assert_equal 0, status.exitstatus assert_equal "2", redis.get("foo") - rescue NotImplementedError => error - raise unless error.message =~ /fork is not available/ end end end diff --git a/test/redis/internals_test.rb b/test/redis/internals_test.rb index 80c0dbd7b..6b0a06b1f 100644 --- a/test/redis/internals_test.rb +++ b/test/redis/internals_test.rb @@ -5,13 +5,6 @@ class TestInternals < Minitest::Test include Helper::Client - def test_logger - r.ping - - assert log.string["[Redis] command=PING"] - assert log.string =~ /\[Redis\] call_time=\d+\.\d+ ms/ - end - def test_large_payload # see: https://github.com/redis/redis-rb/issues/962 # large payloads will trigger write_nonblock to write a portion @@ -22,16 +15,6 @@ def test_large_payload assert_equal result, large end - def test_logger_with_pipelining - r.pipelined do - r.set "foo", "bar" - r.get "foo" - end - - assert log.string[" command=SET args=\"foo\" \"bar\""] - assert log.string[" command=GET args=\"foo\""] - end - def test_recovers_from_failed_commands # See https://github.com/redis/redis-rb/issues#issue/28 @@ -65,22 +48,6 @@ def test_timeout Redis.new(OPTIONS.merge(timeout: 0)) end - driver(:ruby) do - def test_tcp_keepalive - keepalive = { time: 20, intvl: 10, probes: 5 } - - redis = Redis.new(OPTIONS.merge(tcp_keepalive: keepalive)) - redis.ping - - connection = redis._client.connection - actual_keepalive = connection.get_tcp_keepalive - - %i[time intvl probes].each do |key| - assert_equal actual_keepalive[key], keepalive[key] if actual_keepalive.key?(key) - end - end - end - def test_time # Test that the difference between the time that Ruby reports and the time # that Redis reports is minimal (prevents the test from being racy). @@ -129,24 +96,6 @@ def test_retry_by_default end end - def test_retry_when_wrapped_in_with_reconnect_true - close_on_ping([0]) do |redis| - redis.with_reconnect(true) do - assert_equal "1", redis.ping - end - end - end - - def test_dont_retry_when_wrapped_in_with_reconnect_false - close_on_ping([0]) do |redis| - assert_raises Redis::ConnectionError do - redis.with_reconnect(false) do - redis.ping - end - end - end - end - def test_dont_retry_when_wrapped_in_without_reconnect close_on_ping([0]) do |redis| assert_raises Redis::ConnectionError do @@ -184,18 +133,26 @@ def test_retry_with_custom_reconnect_attempts_can_still_fail end def test_retry_with_custom_reconnect_attempts_and_exponential_backoff - close_on_ping([0, 1, 2], reconnect_attempts: 3, - reconnect_delay_max: 0.5, - reconnect_delay: 0.01) do |redis| - Kernel.expects(:sleep).with(0.01).returns(true) - Kernel.expects(:sleep).with(0.02).returns(true) - Kernel.expects(:sleep).with(0.04).returns(true) + close_on_ping([0, 1, 2], reconnect_attempts: [0.01, 0.02, 0.04]) do |redis| + redis._client.config.expects(:sleep).with(0.01).returns(true) + redis._client.config.expects(:sleep).with(0.02).returns(true) + redis._client.config.expects(:sleep).with(0.04).returns(true) assert_equal "3", redis.ping end end + def test_retry_pipeline_first_command + close_on_ping([0]) do |redis| + results = redis.pipelined do |pipeline| + pipeline.ping + end + assert_equal ["1"], results + end + end + def test_don_t_retry_when_second_read_in_pipeline_raises_econnreset + skip("TODO: decide if this is really worth it") close_on_ping([1]) do |redis| assert_raises Redis::ConnectionError do redis.pipelined do |pipeline| @@ -203,7 +160,6 @@ def test_don_t_retry_when_second_read_in_pipeline_raises_econnreset pipeline.ping # Second #read times out end end - refute_predicate redis._client, :connected? end end @@ -244,24 +200,6 @@ def test_retry_on_write_error_by_default end end - def test_retry_on_write_error_when_wrapped_in_with_reconnect_true - close_on_connection([0]) do |redis| - redis.with_reconnect(true) do - assert_equal "1", redis._client.call(["x" * 128 * 1024]) - end - end - end - - def test_dont_retry_on_write_error_when_wrapped_in_with_reconnect_false - close_on_connection([0]) do |redis| - assert_raises Redis::ConnectionError do - redis.with_reconnect(false) do - redis._client.call(["x" * 128 * 1024]) - end - end - end - end - def test_dont_retry_on_write_error_when_wrapped_in_without_reconnect close_on_connection([0]) do |redis| assert_raises Redis::ConnectionError do @@ -291,12 +229,11 @@ def test_bubble_timeout_without_retrying end def test_client_options - redis = Redis.new(OPTIONS.merge(host: "host", port: 1234, db: 1, scheme: "foo")) + redis = Redis.new(OPTIONS.merge(host: "host", port: 1234, db: 1)) - assert_equal "host", redis._client.options[:host] - assert_equal 1234, redis._client.options[:port] - assert_equal 1, redis._client.options[:db] - assert_equal "foo", redis._client.options[:scheme] + assert_equal "host", redis._client.host + assert_equal 1234, redis._client.port + assert_equal 1, redis._client.db end def test_resolves_localhost diff --git a/test/redis/pipelining_commands_test.rb b/test/redis/pipelining_commands_test.rb index 72938f232..30502d3ce 100644 --- a/test/redis/pipelining_commands_test.rb +++ b/test/redis/pipelining_commands_test.rb @@ -191,11 +191,13 @@ def test_config_get_in_a_pipeline_returns_hash def test_hgetall_in_a_pipeline_returns_hash r.hmset("hash", "field", "value") + future = nil result = r.pipelined do |p| - p.hgetall("hash") + future = p.hgetall("hash") end - assert_equal result.first, { "field" => "value" } + assert_equal([{ "field" => "value" }], result) + assert_equal({ "field" => "value" }, future.value) end def test_zpopmax_in_a_pipeline_produces_future @@ -242,30 +244,9 @@ def test_pipeline_select assert_equal "2", r.get("db") end - def test_pipeline_select_client_db - r.select 1 - r.pipelined do |p2| - p2.select 2 - end - - assert_equal 2, r._client.db - end - - def test_nested_pipeline_select_client_db - r.select 1 - r.pipelined do |p2| - p2.select 2 - p2.pipelined do |p3| - p3.select 3 - end - end - - assert_equal 3, r._client.db - end - def test_pipeline_interrupt_preserves_client original = r._client - Redis::Pipeline.stubs(:new).raises(Interrupt) + Redis::PipelinedConnection.stubs(:new).raises(Interrupt) assert_raises(Interrupt) { r.pipelined {} } assert_equal r._client, original end diff --git a/test/redis/publish_subscribe_test.rb b/test/redis/publish_subscribe_test.rb index a4a4e5864..88a111c35 100644 --- a/test/redis/publish_subscribe_test.rb +++ b/test/redis/publish_subscribe_test.rb @@ -246,28 +246,24 @@ def test_subscribe_past_a_timeout def test_subscribe_with_timeout received = false - assert_raises Redis::TimeoutError do - r.subscribe_with_timeout(LOW_TIMEOUT, "foo") do |on| - on.message do |_channel, _message| - received = true - end + r.subscribe_with_timeout(LOW_TIMEOUT, "foo") do |on| + on.message do |_channel, _message| + received = true end end - assert !received + refute received end def test_psubscribe_with_timeout received = false - assert_raises Redis::TimeoutError do - r.psubscribe_with_timeout(LOW_TIMEOUT, "f*") do |on| - on.message do |_channel, _message| - received = true - end + r.psubscribe_with_timeout(LOW_TIMEOUT, "f*") do |on| + on.message do |_channel, _message| + received = true end end - assert !received + refute received end end diff --git a/test/redis/ssl_test.rb b/test/redis/ssl_test.rb index b64a797e2..75bf47290 100644 --- a/test/redis/ssl_test.rb +++ b/test/redis/ssl_test.rb @@ -15,13 +15,13 @@ def test_connection_to_non_ssl_server def test_verified_ssl_connection RedisMock.start({ ping: proc { "+PONG" } }, ssl_server_opts("trusted")) do |port| - redis = Redis.new(port: port, ssl: true, ssl_params: { ca_file: ssl_ca_file }) + redis = Redis.new(host: "127.0.0.1", port: port, ssl: true, ssl_params: { ca_file: ssl_ca_file }) assert_equal redis.ping, "PONG" end end def test_unverified_ssl_connection - assert_raises(OpenSSL::SSL::SSLError) do + assert_raises(Redis::CannotConnectError) do RedisMock.start({ ping: proc { "+PONG" } }, ssl_server_opts("untrusted")) do |port| redis = Redis.new(port: port, ssl: true, ssl_params: { ca_file: ssl_ca_file }) redis.ping @@ -30,7 +30,7 @@ def test_unverified_ssl_connection end def test_verify_certificates_by_default - assert_raises(OpenSSL::SSL::SSLError) do + assert_raises(Redis::CannotConnectError) do RedisMock.start({ ping: proc { "+PONG" } }, ssl_server_opts("untrusted")) do |port| redis = Redis.new(port: port, ssl: true) redis.ping @@ -40,7 +40,7 @@ def test_verify_certificates_by_default def test_ssl_blocking RedisMock.start({}, ssl_server_opts("trusted")) do |port| - redis = Redis.new(port: port, ssl: true, ssl_params: { ca_file: ssl_ca_file }) + redis = Redis.new(host: "127.0.0.1", port: port, ssl: true, ssl_params: { ca_file: ssl_ca_file }) assert_equal redis.set("boom", "a" * 10_000_000), "OK" end end diff --git a/test/redis/transactions_test.rb b/test/redis/transactions_test.rb index f7cfcefd3..cd63d1eea 100644 --- a/test/redis/transactions_test.rb +++ b/test/redis/transactions_test.rb @@ -6,16 +6,9 @@ class TestTransactions < Minitest::Test include Helper::Client def test_multi_discard - r.multi - - assert_equal "QUEUED", r.set("foo", "1") - assert_equal "QUEUED", r.get("foo") - assert_equal "QUEUED", r.zincrby("bar", 1, "baz") # Floatify - assert_equal "QUEUED", r.hsetnx("plop", "foo", "bar") # Boolify - - r.discard - - assert_nil r.get("foo") + assert_raises(LocalJumpError) do + r.multi + end end def test_multi_exec_with_a_block @@ -39,8 +32,9 @@ def test_multi_exec_with_a_block_doesn_t_return_replies_for_multi_and_exec def test_multi_in_pipeline foo_future = bar_future = nil + multi_future = nil response = r.pipelined do |pipeline| - pipeline.multi do |multi| + multi_future = pipeline.multi do |multi| multi.set("foo", "s1") foo_future = multi.get("foo") end @@ -51,10 +45,12 @@ def test_multi_in_pipeline end end + assert_equal(["OK", "QUEUED", "QUEUED", ["OK", "s1"], "OK", "QUEUED", "QUEUED", ["OK", "s2"]], response) + + assert_equal ["OK", "s1"], multi_future.value + assert_equal "s1", foo_future.value assert_equal "s2", bar_future.value - - assert_equal(["OK", "QUEUED", "QUEUED", ["OK", "s1"], "OK", "QUEUED", "QUEUED", ["OK", "s2"]], response) end def test_assignment_inside_multi_exec_block @@ -67,20 +63,16 @@ def test_assignment_inside_multi_exec_block assert_equal false, @second.value end - # Although we could support accessing the values in these futures, - # it doesn't make a lot of sense. def test_assignment_inside_multi_exec_block_with_delayed_command_errors assert_raises(Redis::CommandError) do r.multi do |m| @first = m.set("foo", "s1") @second = m.incr("foo") # not an integer - @third = m.lpush("foo", "value") # wrong kind of value end end assert_equal "OK", @first.value - assert_raises(Redis::CommandError) { @second.value } - assert_raises(Redis::FutureNotReady) { @third.value } + assert_raises(Redis::FutureNotReady) { @second.value } end def test_assignment_inside_multi_exec_block_with_immediate_command_errors @@ -220,7 +212,7 @@ def test_multi_with_a_block_yielding_the_client def test_multi_with_interrupt_preserves_client original = r._client - Redis::Pipeline.stubs(:new).raises(Interrupt) + Redis::MultiConnection.stubs(:new).raises(Interrupt) assert_raises(Interrupt) { r.multi {} } assert_equal r._client, original end diff --git a/test/redis/url_param_test.rb b/test/redis/url_param_test.rb index b2560fe70..e14314ac2 100644 --- a/test/redis/url_param_test.rb +++ b/test/redis/url_param_test.rb @@ -5,10 +5,10 @@ class TestUrlParam < Minitest::Test include Helper::Client - def test_url_defaults_to_______________ + def test_url_defaults_to_localhost redis = Redis.new - assert_equal "127.0.0.1", redis._client.host + assert_equal "localhost", redis._client.host assert_equal 6379, redis._client.port assert_equal 0, redis._client.db assert_nil redis._client.password @@ -23,49 +23,20 @@ def test_allows_to_pass_in_a_url assert_equal "secr3t", redis._client.password end - def test_allows_to_pass_in_a_url_with_string_key - redis = Redis.new "url" => "redis://:secr3t@foo.com:999/2" - - assert_equal "foo.com", redis._client.host - assert_equal 999, redis._client.port - assert_equal 2, redis._client.db - assert_equal "secr3t", redis._client.password - end - def test_unescape_password_from_url redis = Redis.new url: "redis://:secr3t%3A@foo.com:999/2" assert_equal "secr3t:", redis._client.password end - def test_unescape_password_from_url_with_string_key - redis = Redis.new "url" => "redis://:secr3t%3A@foo.com:999/2" - - assert_equal "secr3t:", redis._client.password - end - def test_does_not_unescape_password_when_explicitly_passed redis = Redis.new url: "redis://:secr3t%3A@foo.com:999/2", password: "secr3t%3A" assert_equal "secr3t%3A", redis._client.password end - def test_does_not_unescape_password_when_explicitly_passed_with_string_key - redis = Redis.new :url => "redis://:secr3t%3A@foo.com:999/2", "password" => "secr3t%3A" - - assert_equal "secr3t%3A", redis._client.password - end - def test_override_url_if_path_option_is_passed - redis = Redis.new url: "redis://:secr3t@foo.com/foo:999/2", path: "/tmp/redis.sock" - - assert_equal "/tmp/redis.sock", redis._client.path - assert_nil redis._client.host - assert_nil redis._client.port - end - - def test_override_url_if_path_option_is_passed_with_string_key - redis = Redis.new :url => "redis://:secr3t@foo.com/foo:999/2", "path" => "/tmp/redis.sock" + redis = Redis.new url: "redis://:secr3t@foo.com/2", path: "/tmp/redis.sock" assert_equal "/tmp/redis.sock", redis._client.path assert_nil redis._client.host @@ -81,15 +52,6 @@ def test_overrides_url_if_another_connection_option_is_passed assert_equal "secr3t", redis._client.password end - def test_overrides_url_if_another_connection_option_is_passed_with_string_key - redis = Redis.new :url => "redis://:secr3t@foo.com:999/2", "port" => 1000 - - assert_equal "foo.com", redis._client.host - assert_equal 1000, redis._client.port - assert_equal 2, redis._client.db - assert_equal "secr3t", redis._client.password - end - def test_does_not_overrides_url_if_a_nil_option_is_passed redis = Redis.new url: "redis://:secr3t@foo.com:999/2", port: nil @@ -99,15 +61,6 @@ def test_does_not_overrides_url_if_a_nil_option_is_passed assert_equal "secr3t", redis._client.password end - def test_does_not_overrides_url_if_a_nil_option_is_passed_with_string_key - redis = Redis.new :url => "redis://:secr3t@foo.com:999/2", "port" => nil - - assert_equal "foo.com", redis._client.host - assert_equal 999, redis._client.port - assert_equal 2, redis._client.db - assert_equal "secr3t", redis._client.password - end - def test_does_not_modify_the_passed_options options = { url: "redis://:secr3t@foo.com:999/2" } @@ -130,9 +83,9 @@ def test_uses_redis_url_over_default_if_available end def test_defaults_to_localhost - redis = Redis.new(url: "redis:///") + redis = Redis.new(url: "redis://") - assert_equal "127.0.0.1", redis._client.host + assert_equal "localhost", redis._client.host end def test_ipv6_url @@ -144,7 +97,6 @@ def test_ipv6_url def test_user_and_password redis = Redis.new(url: 'redis://johndoe:mysecret@foo.com:999/2') - assert_equal('redis', redis._client.scheme) assert_equal('johndoe', redis._client.username) assert_equal('mysecret', redis._client.password) assert_equal('foo.com', redis._client.host) diff --git a/test/support/redis_mock.rb b/test/support/redis_mock.rb index ab8bc059c..89e47de00 100644 --- a/test/support/redis_mock.rb +++ b/test/support/redis_mock.rb @@ -93,7 +93,7 @@ def self.start(commands, options = {}, &blk) end command = argv.shift - blk = commands[command.to_sym] + blk = commands[command.downcase.to_sym] blk ||= ->(*_) { "+OK" } response = blk.call(*argv)