Skip to content

Commit

Permalink
FailoverClient with read-only support (#1199)
Browse files Browse the repository at this point in the history
* FailoverClient with read-only support
  • Loading branch information
nextsux committed Sep 4, 2020
1 parent 1e8d282 commit 49aac99
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 1 deletion.
3 changes: 3 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type Options struct {
// Enables read only queries on slave nodes.
readOnly bool

// Enables read only queries on redis replicas in sentinel mode
sentinelReadOnly bool

// TLS Config to use. When set TLS will be negotiated.
TLSConfig *tls.Config

Expand Down
137 changes: 136 additions & 1 deletion sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"errors"
"math/rand"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -50,6 +51,9 @@ type FailoverOptions struct {
IdleCheckFrequency time.Duration

TLSConfig *tls.Config

// Enables read-only commands on slave nodes.
ReadOnly bool
}

func (opt *FailoverOptions) options() *Options {
Expand Down Expand Up @@ -79,6 +83,8 @@ func (opt *FailoverOptions) options() *Options {
MaxConnAge: opt.MaxConnAge,

TLSConfig: opt.TLSConfig,

sentinelReadOnly: opt.ReadOnly,
}
}

Expand Down Expand Up @@ -325,7 +331,15 @@ func (c *sentinelFailover) Pool() *pool.ConnPool {
}

func (c *sentinelFailover) dial(ctx context.Context, network, _ string) (net.Conn, error) {
addr, err := c.MasterAddr(ctx)
var addr string
var err error

if c.opt.sentinelReadOnly {
addr, err = c.RandomSlaveAddr(ctx)
} else {
addr, err = c.MasterAddr(ctx)
}

if err != nil {
return nil, err
}
Expand All @@ -344,6 +358,17 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
return addr, nil
}

func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
addresses, err := c.slaveAddresses(ctx)
if err != nil {
return "", err
}
if len(addresses) < 1 {
return c.MasterAddr(ctx)
}
return addresses[rand.Intn(len(addresses))], nil
}

func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) {
c.mu.RLock()
sentinel := c.sentinel
Expand Down Expand Up @@ -408,6 +433,70 @@ func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) {
return "", errors.New("redis: all sentinels are unreachable")
}

func (c *sentinelFailover) slaveAddresses(ctx context.Context) ([]string, error) {
c.mu.RLock()
sentinel := c.sentinel
c.mu.RUnlock()

if sentinel != nil {
addrs := c.getSlaveAddrs(ctx, sentinel)
if len(addrs) > 0 {
return addrs, nil
}
}

c.mu.Lock()
defer c.mu.Unlock()

if c.sentinel != nil {
addrs := c.getSlaveAddrs(ctx, c.sentinel)
if len(addrs) > 0 {
return addrs, nil
}
_ = c.closeSentinel()
}

for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(&Options{
Addr: sentinelAddr,
Dialer: c.opt.Dialer,

Username: c.opt.Username,
Password: c.opt.Password,

MaxRetries: c.opt.MaxRetries,

DialTimeout: c.opt.DialTimeout,
ReadTimeout: c.opt.ReadTimeout,
WriteTimeout: c.opt.WriteTimeout,

PoolSize: c.opt.PoolSize,
PoolTimeout: c.opt.PoolTimeout,
IdleTimeout: c.opt.IdleTimeout,
IdleCheckFrequency: c.opt.IdleCheckFrequency,

TLSConfig: c.opt.TLSConfig,
})

slaves, err := sentinel.Slaves(ctx, c.masterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves master=%q failed: %s",
c.masterName, err)
_ = sentinel.Close()
continue
}

// Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinel)

addrs := parseSlaveAddresses(slaves)
return addrs, nil
}

return []string{}, errors.New("redis: all sentinels are unreachable")
}

func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) string {
addr, err := sentinel.GetMasterAddrByName(ctx, c.masterName).Result()
if err != nil {
Expand All @@ -418,6 +507,52 @@ func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *Sentinel
return net.JoinHostPort(addr[0], addr[1])
}

func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *SentinelClient) []string {
addrs, err := sentinel.Slaves(ctx, c.masterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: Slaves name=%q failed: %s",
c.masterName, err)
return []string{}
}

return parseSlaveAddresses(addrs)
}

func parseSlaveAddresses(addrs []interface{}) []string {
nodes := []string{}

for _, node := range addrs {
ip := ""
port := ""
flags := []string{}
lastkey := ""
isDown := false

for _, key := range node.([]interface{}) {
switch lastkey {
case "ip":
ip = key.(string)
case "port":
port = key.(string)
case "flags":
flags = strings.Split(key.(string), ",")
}
lastkey = key.(string)
}
for _, flag := range flags {
switch flag {
case "s_down", "o_down", "disconnected":
isDown = true
}
}
if !isDown {
nodes = append(nodes, net.JoinHostPort(ip, port))
}
}

return nodes
}

func (c *sentinelFailover) switchMaster(ctx context.Context, addr string) {
c.mu.RLock()
masterAddr := c._masterAddr
Expand Down

0 comments on commit 49aac99

Please sign in to comment.