Skip to content

Commit

Permalink
Merge pull request redis#588 from go-redis/fix/sentinel-reset-pool
Browse files Browse the repository at this point in the history
Resent client pool when sentinel switches master
  • Loading branch information
vmihailenco authored Jun 29, 2017
2 parents 75ceb98 + 0d94a7b commit b52814f
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 185 deletions.
71 changes: 26 additions & 45 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,47 +140,6 @@ func (p *ConnPool) lastDialError() error {
return p._lastDialError.Load().(error)
}

func (p *ConnPool) PopFree() *Conn {
select {
case p.queue <- struct{}{}:
default:
timer := timers.Get().(*time.Timer)
timer.Reset(p.opt.PoolTimeout)

select {
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
case <-timer.C:
timers.Put(timer)
atomic.AddUint32(&p.stats.Timeouts, 1)
return nil
}
}

p.freeConnsMu.Lock()
cn := p.popFree()
p.freeConnsMu.Unlock()

if cn == nil {
<-p.queue
}
return cn
}

func (p *ConnPool) popFree() *Conn {
if len(p.freeConns) == 0 {
return nil
}

idx := len(p.freeConns) - 1
cn := p.freeConns[idx]
p.freeConns = p.freeConns[:idx]
return cn
}

// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get() (*Conn, bool, error) {
if p.closed() {
Expand Down Expand Up @@ -235,6 +194,17 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
return newcn, true, nil
}

func (p *ConnPool) popFree() *Conn {
if len(p.freeConns) == 0 {
return nil
}

idx := len(p.freeConns) - 1
cn := p.freeConns[idx]
p.freeConns = p.freeConns[:idx]
return cn
}

func (p *ConnPool) Put(cn *Conn) error {
if data := cn.Rd.PeekBuffered(); data != nil {
internal.Logf("connection has unread data: %q", data)
Expand Down Expand Up @@ -303,17 +273,28 @@ func (p *ConnPool) closed() bool {
return atomic.LoadUint32(&p._closed) == 1
}

func (p *ConnPool) Filter(fn func(*Conn) bool) error {
var firstErr error
p.connsMu.Lock()
for _, cn := range p.conns {
if fn(cn) {
if err := p.closeConn(cn); err != nil && firstErr == nil {
firstErr = err
}
}
}
p.connsMu.Unlock()
return firstErr
}

func (p *ConnPool) Close() error {
if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
return ErrClosed
}

p.connsMu.Lock()
var firstErr error
p.connsMu.Lock()
for _, cn := range p.conns {
if cn == nil {
continue
}
if err := p.closeConn(cn); err != nil && firstErr == nil {
firstErr = err
}
Expand Down
26 changes: 0 additions & 26 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,30 +238,4 @@ var _ = Describe("race", func() {
}
})
})

It("does not happen on Get and PopFree", func() {
connPool = pool.NewConnPool(
&pool.Options{
Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Minute,
IdleTimeout: time.Second,
IdleCheckFrequency: time.Millisecond,
})

perform(C, func(id int) {
for i := 0; i < N; i++ {
cn, _, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
if err == nil {
Expect(connPool.Put(cn)).NotTo(HaveOccurred())
}

cn = connPool.PopFree()
if cn != nil {
Expect(connPool.Put(cn)).NotTo(HaveOccurred())
}
}
})
})
})
4 changes: 4 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ var cluster = &clusterScenario{
clients: make(map[string]*redis.Client, 6),
}

func init() {
//redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
}

var _ = BeforeSuite(func() {
var err error

Expand Down
113 changes: 56 additions & 57 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,53 @@ import (
type PubSub struct {
base baseClient

mu sync.Mutex
cn *pool.Conn
closed bool

subMu sync.Mutex
mu sync.Mutex
cn *pool.Conn
channels []string
patterns []string
closed bool

cmd *Cmd
}

func (c *PubSub) conn() (*pool.Conn, bool, error) {
func (c *PubSub) conn() (*pool.Conn, error) {
c.mu.Lock()
defer c.mu.Unlock()
cn, err := c._conn()
c.mu.Unlock()
return cn, err
}

func (c *PubSub) _conn() (*pool.Conn, error) {
if c.closed {
return nil, false, pool.ErrClosed
return nil, pool.ErrClosed
}

if c.cn != nil {
return c.cn, false, nil
return c.cn, nil
}

cn, err := c.base.connPool.NewConn()
if err != nil {
return nil, false, err
return nil, err
}

if !cn.Inited {
if err := c.base.initConn(cn); err != nil {
_ = c.base.connPool.CloseConn(cn)
return nil, false, err
return nil, err
}
}

if err := c.resubscribe(cn); err != nil {
_ = c.base.connPool.CloseConn(cn)
return nil, false, err
return nil, err
}

c.cn = cn
return cn, true, nil
return cn, nil
}

func (c *PubSub) resubscribe(cn *pool.Conn) error {
c.subMu.Lock()
defer c.subMu.Unlock()

var firstErr error
if len(c.channels) > 0 {
if err := c._subscribe(cn, "subscribe", c.channels...); err != nil && firstErr == nil {
Expand All @@ -81,6 +80,18 @@ func (c *PubSub) resubscribe(cn *pool.Conn) error {
return firstErr
}

func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
args := make([]interface{}, 1+len(channels))
args[0] = redisCmd
for i, channel := range channels {
args[1+i] = channel
}
cmd := NewSliceCmd(args...)

cn.SetWriteTimeout(c.base.opt.WriteTimeout)
return writeCmd(cn, cmd)
}

func (c *PubSub) putConn(cn *pool.Conn, err error) {
if !internal.IsBadConn(err, true) {
return
Expand Down Expand Up @@ -114,67 +125,55 @@ func (c *PubSub) Close() error {
return nil
}

func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
cn, isNew, err := c.conn()
if err != nil {
return err
}

if isNew {
return nil
}

err = c._subscribe(cn, redisCmd, channels...)
c.putConn(cn, err)
return err
}

func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
args := make([]interface{}, 1+len(channels))
args[0] = redisCmd
for i, channel := range channels {
args[1+i] = channel
}
cmd := NewSliceCmd(args...)

cn.SetWriteTimeout(c.base.opt.WriteTimeout)
return writeCmd(cn, cmd)
}

// Subscribes the client to the specified channels. It returns
// empty subscription if there are no channels.
func (c *PubSub) Subscribe(channels ...string) error {
c.subMu.Lock()
c.mu.Lock()
err := c.subscribe("subscribe", channels...)
c.channels = appendIfNotExists(c.channels, channels...)
c.subMu.Unlock()
return c.subscribe("subscribe", channels...)
c.mu.Unlock()
return err
}

// Subscribes the client to the given patterns. It returns
// empty subscription if there are no patterns.
func (c *PubSub) PSubscribe(patterns ...string) error {
c.subMu.Lock()
c.mu.Lock()
err := c.subscribe("psubscribe", patterns...)
c.patterns = appendIfNotExists(c.patterns, patterns...)
c.subMu.Unlock()
return c.subscribe("psubscribe", patterns...)
c.mu.Unlock()
return err
}

// Unsubscribes the client from the given channels, or from all of
// them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error {
c.subMu.Lock()
c.mu.Lock()
err := c.subscribe("unsubscribe", channels...)
c.channels = remove(c.channels, channels...)
c.subMu.Unlock()
return c.subscribe("unsubscribe", channels...)
c.mu.Unlock()
return err
}

// Unsubscribes the client from the given patterns, or from all of
// them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error {
c.subMu.Lock()
c.mu.Lock()
err := c.subscribe("punsubscribe", patterns...)
c.patterns = remove(c.patterns, patterns...)
c.subMu.Unlock()
return c.subscribe("punsubscribe", patterns...)
c.mu.Unlock()
return err
}

func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
cn, err := c._conn()
if err != nil {
return err
}

err = c._subscribe(cn, redisCmd, channels...)
c.putConn(cn, err)
return err
}

func (c *PubSub) Ping(payload ...string) error {
Expand All @@ -184,7 +183,7 @@ func (c *PubSub) Ping(payload ...string) error {
}
cmd := NewCmd(args...)

cn, _, err := c.conn()
cn, err := c.conn()
if err != nil {
return err
}
Expand Down Expand Up @@ -277,7 +276,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
c.cmd = NewCmd()
}

cn, _, err := c.conn()
cn, err := c.conn()
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit b52814f

Please sign in to comment.