Skip to content

Commit

Permalink
Set error on the commands in case there are no more attempts left
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Feb 2, 2020
1 parent 5edc4c8 commit b6e63b9
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 53 deletions.
124 changes: 71 additions & 53 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,12 +1046,13 @@ func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error

func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
cmdsMap := newCmdsMap()
err := c.mapCmdsByNode(cmds, cmdsMap)
err := c.mapCmdsByNode(cmdsMap, cmds)
if err != nil {
setCmdsErr(cmds, err)
return err
}

failedCmds := newCmdsMap()
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
Expand All @@ -1060,7 +1061,6 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
}
}

failedCmds := newCmdsMap()
var wg sync.WaitGroup

for node, cmds := range cmdsMap.m {
Expand All @@ -1080,11 +1080,15 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
return c.pipelineReadCmds(node, rd, cmds, failedCmds)
})
})
if err != nil {
err = c.mapCmdsByNode(cmds, failedCmds)
if err != nil {
if err == nil {
return
}
if attempt < c.opt.MaxRedirects {
if err := c.mapCmdsByNode(failedCmds, cmds); err != nil {
setCmdsErr(cmds, err)
}
} else {
setCmdsErr(cmds, err)
}
}(node, cmds)
}
Expand All @@ -1094,46 +1098,33 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
break
}
cmdsMap = failedCmds
failedCmds.Reset()
}

return cmdsFirstErr(cmds)
}

type cmdsMap struct {
mu sync.Mutex
m map[*clusterNode][]Cmder
}

func newCmdsMap() *cmdsMap {
return &cmdsMap{
m: make(map[*clusterNode][]Cmder),
}
}

func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
m.mu.Lock()
m.m[node] = append(m.m[node], cmds...)
m.mu.Unlock()
}

func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
func (c *ClusterClient) mapCmdsByNode(cmdsMap *cmdsMap, cmds []Cmder) error {
state, err := c.state.Get()
if err != nil {
return err
}

cmdsAreReadOnly := c.opt.ReadOnly && c.cmdsAreReadOnly(cmds)
if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) {
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
node, err := c.slotReadOnlyNode(state, slot)
if err != nil {
return err
}
cmdsMap.Add(node, cmd)
}
return nil
}

for _, cmd := range cmds {
slot := c.cmdSlot(cmd)

var node *clusterNode
var err error
if cmdsAreReadOnly {
cmdInfo := c.cmdInfo(cmd.Name())
node, err = c.cmdNode(cmdInfo, slot)
} else {
node, err = state.slotMasterNode(slot)
}
node, err := state.slotMasterNode(slot)
if err != nil {
return err
}
Expand Down Expand Up @@ -1237,6 +1228,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
}

cmdsMap := map[*clusterNode][]Cmder{node: cmds}
failedCmds := newCmdsMap()
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
Expand All @@ -1245,7 +1237,6 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
}
}

failedCmds := newCmdsMap()
var wg sync.WaitGroup

for node, cmds := range cmdsMap {
Expand All @@ -1261,7 +1252,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
return err
}

err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
err := c.txPipelineReadQueued(rd, cmds, failedCmds)
if err != nil {
moved, ask, addr := isMovedError(err)
Expand All @@ -1272,13 +1263,16 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
}
return pipelineReadCmds(rd, cmds)
})
return err
})
if err != nil {
err = c.mapCmdsByNode(cmds, failedCmds)
if err != nil {
if err == nil {
return
}
if attempt < c.opt.MaxRedirects {
if err := c.mapCmdsByNode(failedCmds, cmds); err != nil {
setCmdsErr(cmds, err)
}
} else {
setCmdsErr(cmds, err)
}
}(node, cmds)
}
Expand All @@ -1288,6 +1282,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
break
}
cmdsMap = failedCmds.m
failedCmds.Reset()
}
}

Expand Down Expand Up @@ -1561,29 +1556,27 @@ func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, e
}

if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
if c.opt.RouteByLatency {
return state.slotClosestNode(slot)
}
if c.opt.RouteRandomly {
return state.slotRandomNode(slot)
}
return state.slotSlaveNode(slot)
return c.slotReadOnlyNode(state, slot)
}

return state.slotMasterNode(slot)
}

func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
if c.opt.RouteByLatency {
return state.slotClosestNode(slot)
}
if c.opt.RouteRandomly {
return state.slotRandomNode(slot)
}
return state.slotSlaveNode(slot)
}

func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
state, err := c.state.Get()
if err != nil {
return nil, err
}

nodes := state.slotNodes(slot)
if len(nodes) > 0 {
return nodes[0], nil
}
return c.nodes.Random()
return state.slotMasterNode(slot)
}

func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
Expand Down Expand Up @@ -1622,3 +1615,28 @@ func remove(ss []string, es ...string) []string {
}
return ss
}

//------------------------------------------------------------------------------

type cmdsMap struct {
mu sync.Mutex
m map[*clusterNode][]Cmder
}

func newCmdsMap() *cmdsMap {
return &cmdsMap{
m: make(map[*clusterNode][]Cmder),
}
}

func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
m.mu.Lock()
m.m[node] = append(m.m[node], cmds...)
m.mu.Unlock()
}

func (m *cmdsMap) Reset() {
for k := range m.m {
delete(m.m, k)
}
}
3 changes: 3 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ func (opt *Options) init() {
opt.IdleCheckFrequency = time.Minute
}

if opt.MaxRetries == -1 {
opt.MaxRetries = 0
}
switch opt.MinRetryBackoff {
case -1:
opt.MinRetryBackoff = 0
Expand Down

0 comments on commit b6e63b9

Please sign in to comment.