Skip to content

Commit

Permalink
new feature : replay command in a pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
ikenchina committed Nov 14, 2024
1 parent d3022ec commit 0d0b0f7
Show file tree
Hide file tree
Showing 14 changed files with 433 additions and 33 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ type ReplayConfig struct {
UpdateCheckpointTicker time.Duration `yaml:"updateCheckpointTicker"`
ReplayTransaction *bool `yaml:"replayTransaction" default:"true"`
Stats OutputStats `yaml:"stats"`
AofPipelineMode bool `yaml:"enableAofPipeline"`
}

func (of *OutputConfig) fix() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/redis_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *redisCluster) Register(ctx context.Context, serviceName string, instanc
}

func (c *redisCluster) Discover(ctx context.Context, serviceName string) ([]string, error) {
bb := c.redisCli.NewBatcher()
bb := c.redisCli.NewBatcher(false)
bb.Put("KEYS", serviceName+"*")
replies, err := bb.Exec()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/redis/client/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (cc *ClusterRedis) Do(cmd string, args ...interface{}) (interface{}, error)
return cc.client.Do(cmd, args...)
}

func (cc *ClusterRedis) NewBatcher() common.CmdBatcher {
return cc.client.NewBatcher()
func (cc *ClusterRedis) NewBatcher(pipeline bool) common.CmdBatcher {
return cc.client.NewBatcher(pipeline)
}

// @TODO
Expand Down
10 changes: 10 additions & 0 deletions pkg/redis/client/cluster/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type nodeBatch struct {

err error
done chan int

conn *redisConn
}

type nodeCommand struct {
Expand Down Expand Up @@ -216,3 +218,11 @@ func (bat *Batch) doBatch(batch *nodeBatch) {
batch.node.releaseConn(conn)
batch.done <- 1
}

func (bat *Batch) Receive() ([]interface{}, error) {
return nil, errors.ErrUnsupported

Check failure on line 223 in pkg/redis/client/cluster/batch.go

View workflow job for this annotation

GitHub Actions / build

undefined: errors.ErrUnsupported
}

func (bat *Batch) Dispatch() error {
return errors.ErrUnsupported

Check failure on line 227 in pkg/redis/client/cluster/batch.go

View workflow job for this annotation

GitHub Actions / build

undefined: errors.ErrUnsupported
}
245 changes: 245 additions & 0 deletions pkg/redis/client/cluster/batch_pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package redis

import (
"errors"
"fmt"
"strings"

"github.com/mgtv-tech/redis-GunYu/pkg/redis/client/common"
"github.com/mgtv-tech/redis-GunYu/pkg/util"
)

type batchPipeline struct {
nodeConns map[*redisNode]*redisConn
cluster *Cluster
}

func (bp *batchPipeline) NewBatcher() common.CmdBatcher {
return &batch2{
cluster: bp.cluster,
pipeline: bp,
batches: make([]nodeBatch, 0),
index: make([]int, 0),
}
}

func (bp *batchPipeline) getConn(node *redisNode) (*redisConn, error) {
c, ok := bp.nodeConns[node]
if ok {
if !c.isClosed() {
return c, nil
}
}

c, err := node.getConn()
if err != nil {
return nil, err
}

bp.nodeConns[node] = c
return c, nil
}

func (bp *batchPipeline) closeConn(conn *redisConn) {
conn.shutdown()
}

func (bp *batchPipeline) Close() {
for _, c := range bp.nodeConns {
if !c.isClosed() {
c.shutdown()
}
}
}

type batch2 struct {
pipeline *batchPipeline
cluster *Cluster
batches []nodeBatch
index []int
err error
}

func (tb *batch2) joinError(err error) error {
tb.err = errors.Join(tb.err, err)
return err
}

// Put add a redis command to batch, DO NOT put MGET/MSET/MSETNX.
// it ignores multi/exec transaction
func (batch *batch2) Put(cmd string, args ...interface{}) error {

switch strings.ToUpper(cmd) {
case "KEYS":
nodes := batch.cluster.getAllNodes()

for i, node := range nodes {
batch.batches = append(batch.batches,
nodeBatch{
node: node,
cmds: []nodeCommand{{cmd: cmd, args: args}},
done: make(chan int)})
batch.index = append(batch.index, i)
}
return nil
}

node, err := batch.cluster.ChooseNodeWithCmd(cmd, args...)
if err != nil {
err = fmt.Errorf("run ChooseNodeWithCmd error : %w", err)
return batch.joinError(err)
}
if node == nil {
// node is nil means no need to put
return nil
}

var i int
for i = 0; i < len(batch.batches); i++ {
if batch.batches[i].node == node {
batch.batches[i].cmds = append(batch.batches[i].cmds,
nodeCommand{cmd: cmd, args: args})

batch.index = append(batch.index, i)
break
}
}

if i == len(batch.batches) {
if batch.cluster.transactionEnable && len(batch.batches) == 1 {
return batch.joinError(common.ErrCrossSlots)
}
batch.batches = append(batch.batches,
nodeBatch{
node: node,
cmds: []nodeCommand{{cmd: cmd, args: args}},
done: make(chan int)})
batch.index = append(batch.index, i)
}

return nil
}

func (batch *batch2) GetBatchSize() int {
if batch == nil || batch.index == nil {
return 0
}

return len(batch.index)
}

func (batch *batch2) Len() int {
ll := 0
for _, b := range batch.batches {
ll += len(b.cmds)
}
return ll
}

func (bat *batch2) Exec() ([]interface{}, error) {
return nil, errors.ErrUnsupported

Check failure on line 140 in pkg/redis/client/cluster/batch_pipe.go

View workflow job for this annotation

GitHub Actions / build

undefined: errors.ErrUnsupported
}

func (bat *batch2) Dispatch() error {
if bat.err != nil {
return bat.err
}

if bat == nil || bat.batches == nil || len(bat.batches) == 0 {
return nil
}

for i := range bat.batches {
go bat.doBatch(&bat.batches[i])
}

for i := range bat.batches {
<-bat.batches[i].done
}
return nil
}

func (bat *batch2) doBatch(batch *nodeBatch) {
conn, err := bat.pipeline.getConn(batch.node)
if err != nil {
batch.err = err
batch.done <- 1
return
}

exec := util.OpenCircuitExec{}

for i := range batch.cmds {
exec.Do(func() error { return conn.send(batch.cmds[i].cmd, batch.cmds[i].args...) })
}

err = exec.Do(func() error { return conn.flush() })
if err != nil {
batch.err = err
bat.pipeline.closeConn(conn)
batch.done <- 1
return
}
batch.conn = conn

batch.done <- 1
}

func (bat *batch2) Receive() ([]interface{}, error) {
if bat.err != nil {
return nil, bat.err
}

if bat == nil || bat.batches == nil || len(bat.batches) == 0 {
return []interface{}{}, nil
}

for i := range bat.batches {
go bat.receiveReply(&bat.batches[i])
}

for i := range bat.batches {
<-bat.batches[i].done
}

var replies []interface{}
for _, i := range bat.index {
if bat.batches[i].err != nil {
return nil, bat.batches[i].err
}
replies = append(replies, bat.batches[i].cmds[0].reply)
bat.batches[i].cmds = bat.batches[i].cmds[1:]
}

return replies, nil
}

func (bat *batch2) receiveReply(batch *nodeBatch) {
conn := batch.conn

for i := range batch.cmds {
reply, err := conn.receive()
if err != nil {
if err == common.ErrNil {
continue
}
batch.err = err
bat.pipeline.closeConn(conn)
batch.done <- 1
return
}
reply, err = bat.cluster.handleReply(batch.node, reply, batch.cmds[i].cmd, batch.cmds[i].args...)
// @TODO
// 这个cmd没有执行成功,那么后面的可能已经成功了。如果直接断开,则会造成上层以为都失败了。
if err != nil {
batch.err = err
bat.pipeline.closeConn(conn)
batch.done <- 1
return
}

batch.cmds[i].reply, batch.cmds[i].err = reply, err
}

batch.done <- 1
}
8 changes: 4 additions & 4 deletions pkg/redis/client/cluster/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ func TestBatcherKeysCmd(t *testing.T) {
}

cluster := newRedisNodeCluster(t)
bb := cluster.NewBatcher()
bb := cluster.NewBatcher(false)
for k, v := range cases {
bb.Put("SET", k, v)
}
bb.Exec()

bb = cluster.NewBatcher()
bb = cluster.NewBatcher(false)
bb.Put("KEYS", "aa*")

res, err := bb.Exec()
Expand All @@ -51,7 +51,7 @@ func TestBatcher(t *testing.T) {
cluster := newRedisNodeCluster(t)

t.Run("", func(t *testing.T) {
bb := cluster.NewBatcher()
bb := cluster.NewBatcher(false)
bb.Put("multi")
bb.Put("SET", "aa", 1)
bb.Put("SET", "aa", 2)
Expand All @@ -71,7 +71,7 @@ func TestBatcher(t *testing.T) {
})

t.Run("", func(t *testing.T) {
bb := cluster.NewBatcher()
bb := cluster.NewBatcher(false)
bb.Put("SET", "aa", 1)
bb.Put("SET", "aa", 2)
bb.Put("SET", "bb", 3)
Expand Down
15 changes: 12 additions & 3 deletions pkg/redis/client/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ type Options struct {
// cache and update cluster info, and execute all kinds of commands.
// Multiple goroutines may invoke methods on a cluster simutaneously.
type Cluster struct {
slots [kClusterSlots]*redisNode
nodes map[string]*redisNode
slots [kClusterSlots]*redisNode
nodes map[string]*redisNode
pipeline *batchPipeline

connTimeout time.Duration
readTimeout time.Duration
Expand Down Expand Up @@ -107,6 +108,10 @@ func NewCluster(options *Options) (*Cluster, error) {
logger: log.WithLogger(config.LogModuleName("[redis cluster] ")),
safeRand: util.NewSafeRand(time.Now().Unix()),
}
cluster.pipeline = &batchPipeline{
nodeConns: make(map[*redisNode]*redisConn),
cluster: cluster,
}

errList := make([]error, 0)
for i := range options.StartNodes {
Expand Down Expand Up @@ -258,7 +263,10 @@ func (cluster *Cluster) handleReply(node *redisNode, reply interface{}, cmd stri
panic("unreachable")
}

func (cluster *Cluster) NewBatcher() common.CmdBatcher {
func (cluster *Cluster) NewBatcher(pipeline bool) common.CmdBatcher {
if pipeline {
return cluster.pipeline.NewBatcher()
}
return cluster.NewBatch()
}

Expand All @@ -270,6 +278,7 @@ func (cluster *Cluster) Close() {
if cluster.closed.CompareAndSwap(false, true) {
close(cluster.closeCh)
close(cluster.updateList)
cluster.pipeline.Close()
for addr, node := range cluster.nodes {
node.shutdown()
delete(cluster.nodes, addr)
Expand Down
13 changes: 10 additions & 3 deletions pkg/redis/client/cluster/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ var (
)

type redisConn struct {
c net.Conn
t time.Time
closed atomic.Bool
c net.Conn
t time.Time

br *bufio.Reader
bw *bufio.Writer
Expand Down Expand Up @@ -65,7 +66,13 @@ func (conn *redisConn) auth(password string) (err error) {
}

func (conn *redisConn) shutdown() {
conn.c.Close()
if conn.closed.CompareAndSwap(false, true) {
conn.c.Close()
}
}

func (conn *redisConn) isClosed() bool {
return conn.closed.Load()
}

func (conn *redisConn) send(cmd string, args ...interface{}) error {
Expand Down
Loading

0 comments on commit 0d0b0f7

Please sign in to comment.