From e8ceccda172e6019b350c44b69fa5d585d3f477d Mon Sep 17 00:00:00 2001 From: Siavash Safi Date: Thu, 16 Dec 2021 15:45:48 +0100 Subject: [PATCH] Support Redis Cluster Configuration Endpoint The go-redis options requires multiple redis addresses for a cluster. In some scenarios like AWS ElastiCache, a single configuration endpoint is used to resolve all cluster nodes. This change adds an extra step to resolve all nodes and pass them to go-redis. It remove the need to specify multiple cluster nodes in the configuration. No tests were added for this scenario since it requires patching the Go resolver during runtime. Signed-off-by: Siavash Safi --- docs/sources/configuration/_index.md | 2 +- pkg/storage/chunk/cache/cache.go | 7 ++++++- pkg/storage/chunk/cache/redis_client.go | 27 +++++++++++++++++++++---- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index e95ac0797d98..e082df5e4bd0 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -1757,7 +1757,7 @@ memcached_client: [consistent_hash: ] redis: - # Redis Server endpoint to use for caching. A comma-separated list of endpoints + # Redis Server or Cluster configuration endpoint to use for caching. A comma-separated list of endpoints # for Redis Cluster or Redis Sentinel. If empty, no redis will be used. # CLI flag: -.redis.endpoint [endpoint: ] diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 10418524bbaf..7a66097b8d38 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -4,6 +4,7 @@ import ( "context" "errors" "flag" + "fmt" "time" "github.com/go-kit/log" @@ -113,7 +114,11 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger) (Cache, error cfg.Redis.Expiration = cfg.DefaultValidity } cacheName := cfg.Prefix + "redis" - cache := NewRedisCache(cacheName, NewRedisClient(&cfg.Redis), logger) + client, err := NewRedisClient(&cfg.Redis) + if err != nil { + return nil, fmt.Errorf("redis client setup failed: %w", err) + } + cache := NewRedisCache(cacheName, client, logger) caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache, reg), reg)) } diff --git a/pkg/storage/chunk/cache/redis_client.go b/pkg/storage/chunk/cache/redis_client.go index 1c1bd8f18491..1d59611a9626 100644 --- a/pkg/storage/chunk/cache/redis_client.go +++ b/pkg/storage/chunk/cache/redis_client.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "flag" "fmt" + "net" "strings" "time" "unsafe" @@ -31,7 +32,7 @@ type RedisConfig struct { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *RedisConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { - f.StringVar(&cfg.Endpoint, prefix+"redis.endpoint", "", description+"Redis Server endpoint to use for caching. A comma-separated list of endpoints for Redis Cluster or Redis Sentinel. If empty, no redis will be used.") + f.StringVar(&cfg.Endpoint, prefix+"redis.endpoint", "", description+"Redis Server or Cluster configuration endpoint to use for caching. A comma-separated list of endpoints for Redis Cluster or Redis Sentinel. If empty, no redis will be used.") f.StringVar(&cfg.MasterName, prefix+"redis.master-name", "", description+"Redis Sentinel master name. An empty string for Redis Server or Redis Cluster.") f.DurationVar(&cfg.Timeout, prefix+"redis.timeout", 500*time.Millisecond, description+"Maximum time to wait before giving up on redis requests.") f.DurationVar(&cfg.Expiration, prefix+"redis.expiration", 0, description+"How long keys stay in the redis.") @@ -51,9 +52,27 @@ type RedisClient struct { } // NewRedisClient creates Redis client -func NewRedisClient(cfg *RedisConfig) *RedisClient { +func NewRedisClient(cfg *RedisConfig) (*RedisClient, error) { + endpoints := strings.Split(cfg.Endpoint, ",") + // Handle single configuration endpoint which resolves multiple nodes. + if len(endpoints) == 1 { + host, port, err := net.SplitHostPort(endpoints[0]) + if err != nil { + return nil, err + } + addrs, err := net.LookupHost(host) + if err != nil { + return nil, err + } + if len(addrs) > 1 { + endpoints = nil + for _, addr := range addrs { + endpoints = append(endpoints, net.JoinHostPort(addr, port)) + } + } + } opt := &redis.UniversalOptions{ - Addrs: strings.Split(cfg.Endpoint, ","), + Addrs: endpoints, MasterName: cfg.MasterName, Password: cfg.Password.Value, DB: cfg.DB, @@ -68,7 +87,7 @@ func NewRedisClient(cfg *RedisConfig) *RedisClient { expiration: cfg.Expiration, timeout: cfg.Timeout, rdb: redis.NewUniversalClient(opt), - } + }, nil } func (c *RedisClient) Ping(ctx context.Context) error {