Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new feature : replay command in a pipeline #74

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ type ReplayConfig struct {
UpdateCheckpointTicker time.Duration `yaml:"updateCheckpointTicker"`
ReplayTransaction *bool `yaml:"replayTransaction" default:"true"`
Stats OutputStats `yaml:"stats"`
AofPipelineMode bool `yaml:"enableAofPipeline"`
}

func (of *OutputConfig) fix() error {
Expand Down
2 changes: 1 addition & 1 deletion docs/sync_configuration_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ The output configuration is as follows:
- replayRdbParallel: Number of threads used for replaying RDB. The default is the CPU count multiplied by 4.
- updateCheckpointTicker: Default: 1 second.
- keepaliveTicker: Default: 3 seconds. Interval for keeping the heartbeat.

- enableAofPipeline : Replay commands in a pipeline. Send command and receive reply in different threads, while it can speed up data synchronization, may lead to data inconsistency. Enable this feature with caution.


#### Filter configuration
Expand Down
1 change: 1 addition & 0 deletions docs/sync_configuration_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ output配置如下:
- replayRdbParallel : 用几个线程来回放RDB,默认为CPU数量 * 4
- updateCheckpointTicker : 默认1秒
- keepaliveTicker : 默认3秒,保持心跳时间间隔
- enableAofPipeline : 开启pipeline的方式回放命令。发送命令和接收回复在不同的线程,能加快数据同步速度,但也可能造成数据不一致。例如:发送命令A、B到redis,如果A执行失败,B执行成功,那么另一个线程接收到A执行失败时可能B已经执行完了,而同步的偏移量已经记录成B的了,那A数据就丢失了。谨慎开启。


#### filter配置
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/redis_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *redisCluster) Register(ctx context.Context, serviceName string, instanc
}

func (c *redisCluster) Discover(ctx context.Context, serviceName string) ([]string, error) {
bb := c.redisCli.NewBatcher()
bb := c.redisCli.NewBatcher(false)
bb.Put("KEYS", serviceName+"*")
replies, err := bb.Exec()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/redis/client/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (cc *ClusterRedis) Do(cmd string, args ...interface{}) (interface{}, error)
return cc.client.Do(cmd, args...)
}

func (cc *ClusterRedis) NewBatcher() common.CmdBatcher {
return cc.client.NewBatcher()
func (cc *ClusterRedis) NewBatcher(pipeline bool) common.CmdBatcher {
return cc.client.NewBatcher(pipeline)
}

// @TODO
Expand Down
10 changes: 10 additions & 0 deletions pkg/redis/client/cluster/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type nodeBatch struct {

err error
done chan int

conn *redisConn
}

type nodeCommand struct {
Expand Down Expand Up @@ -216,3 +218,11 @@ func (bat *Batch) doBatch(batch *nodeBatch) {
batch.node.releaseConn(conn)
batch.done <- 1
}

func (bat *Batch) Receive() ([]interface{}, error) {
return nil, common.ErrUnsupported
}

func (bat *Batch) Dispatch() error {
return common.ErrUnsupported
}
245 changes: 245 additions & 0 deletions pkg/redis/client/cluster/batch_pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package redis

import (
"errors"
"fmt"
"strings"

"github.com/mgtv-tech/redis-GunYu/pkg/redis/client/common"
"github.com/mgtv-tech/redis-GunYu/pkg/util"
)

type batchPipeline struct {
nodeConns map[*redisNode]*redisConn
cluster *Cluster
}

func (bp *batchPipeline) NewBatcher() common.CmdBatcher {
return &batch2{
cluster: bp.cluster,
pipeline: bp,
batches: make([]nodeBatch, 0),
index: make([]int, 0),
}
}

func (bp *batchPipeline) getConn(node *redisNode) (*redisConn, error) {
c, ok := bp.nodeConns[node]
if ok {
if !c.isClosed() {
return c, nil
}
}

c, err := node.getConn()
if err != nil {
return nil, err
}

bp.nodeConns[node] = c
return c, nil
}

func (bp *batchPipeline) closeConn(conn *redisConn) {
conn.shutdown()
}

func (bp *batchPipeline) Close() {
for _, c := range bp.nodeConns {
if !c.isClosed() {
c.shutdown()
}
}
}

type batch2 struct {
pipeline *batchPipeline
cluster *Cluster
batches []nodeBatch
index []int
err error
}

func (tb *batch2) joinError(err error) error {
tb.err = errors.Join(tb.err, err)
return err
}

// Put add a redis command to batch, DO NOT put MGET/MSET/MSETNX.
// it ignores multi/exec transaction
func (batch *batch2) Put(cmd string, args ...interface{}) error {

switch strings.ToUpper(cmd) {
case "KEYS":
nodes := batch.cluster.getAllNodes()

for i, node := range nodes {
batch.batches = append(batch.batches,
nodeBatch{
node: node,
cmds: []nodeCommand{{cmd: cmd, args: args}},
done: make(chan int)})
batch.index = append(batch.index, i)
}
return nil
}

node, err := batch.cluster.ChooseNodeWithCmd(cmd, args...)
if err != nil {
err = fmt.Errorf("run ChooseNodeWithCmd error : %w", err)
return batch.joinError(err)
}
if node == nil {
// node is nil means no need to put
return nil
}

var i int
for i = 0; i < len(batch.batches); i++ {
if batch.batches[i].node == node {
batch.batches[i].cmds = append(batch.batches[i].cmds,
nodeCommand{cmd: cmd, args: args})

batch.index = append(batch.index, i)
break
}
}

if i == len(batch.batches) {
if batch.cluster.transactionEnable && len(batch.batches) == 1 {
return batch.joinError(common.ErrCrossSlots)
}
batch.batches = append(batch.batches,
nodeBatch{
node: node,
cmds: []nodeCommand{{cmd: cmd, args: args}},
done: make(chan int)})
batch.index = append(batch.index, i)
}

return nil
}

func (batch *batch2) GetBatchSize() int {
if batch == nil || batch.index == nil {
return 0
}

return len(batch.index)
}

func (batch *batch2) Len() int {
ll := 0
for _, b := range batch.batches {
ll += len(b.cmds)
}
return ll
}

func (bat *batch2) Exec() ([]interface{}, error) {
return nil, common.ErrUnsupported
}

func (bat *batch2) Dispatch() error {
if bat.err != nil {
return bat.err
}

if bat == nil || bat.batches == nil || len(bat.batches) == 0 {
return nil
}

for i := range bat.batches {
go bat.doBatch(&bat.batches[i])
}

for i := range bat.batches {
<-bat.batches[i].done
}
return nil
}

func (bat *batch2) doBatch(batch *nodeBatch) {
conn, err := bat.pipeline.getConn(batch.node)
if err != nil {
batch.err = err
batch.done <- 1
return
}

exec := util.OpenCircuitExec{}

for i := range batch.cmds {
exec.Do(func() error { return conn.send(batch.cmds[i].cmd, batch.cmds[i].args...) })
}

err = exec.Do(func() error { return conn.flush() })
if err != nil {
batch.err = err
bat.pipeline.closeConn(conn)
batch.done <- 1
return
}
batch.conn = conn

batch.done <- 1
}

func (bat *batch2) Receive() ([]interface{}, error) {
if bat.err != nil {
return nil, bat.err
}

if bat == nil || bat.batches == nil || len(bat.batches) == 0 {
return []interface{}{}, nil
}

for i := range bat.batches {
go bat.receiveReply(&bat.batches[i])
}

for i := range bat.batches {
<-bat.batches[i].done
}

var replies []interface{}
for _, i := range bat.index {
if bat.batches[i].err != nil {
return nil, bat.batches[i].err
}
replies = append(replies, bat.batches[i].cmds[0].reply)
bat.batches[i].cmds = bat.batches[i].cmds[1:]
}

return replies, nil
}

func (bat *batch2) receiveReply(batch *nodeBatch) {
conn := batch.conn

for i := range batch.cmds {
reply, err := conn.receive()
if err != nil {
if err == common.ErrNil {
continue
}
batch.err = err
bat.pipeline.closeConn(conn)
batch.done <- 1
return
}
reply, err = bat.cluster.handleReply(batch.node, reply, batch.cmds[i].cmd, batch.cmds[i].args...)
// @TODO
// 这个cmd没有执行成功,那么后面的可能已经成功了。如果直接断开,则会造成上层以为都失败了。
if err != nil {
batch.err = err
bat.pipeline.closeConn(conn)
batch.done <- 1
return
}

batch.cmds[i].reply, batch.cmds[i].err = reply, err
}

batch.done <- 1
}
8 changes: 4 additions & 4 deletions pkg/redis/client/cluster/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ func TestBatcherKeysCmd(t *testing.T) {
}

cluster := newRedisNodeCluster(t)
bb := cluster.NewBatcher()
bb := cluster.NewBatcher(false)
for k, v := range cases {
bb.Put("SET", k, v)
}
bb.Exec()

bb = cluster.NewBatcher()
bb = cluster.NewBatcher(false)
bb.Put("KEYS", "aa*")

res, err := bb.Exec()
Expand All @@ -51,7 +51,7 @@ func TestBatcher(t *testing.T) {
cluster := newRedisNodeCluster(t)

t.Run("", func(t *testing.T) {
bb := cluster.NewBatcher()
bb := cluster.NewBatcher(false)
bb.Put("multi")
bb.Put("SET", "aa", 1)
bb.Put("SET", "aa", 2)
Expand All @@ -71,7 +71,7 @@ func TestBatcher(t *testing.T) {
})

t.Run("", func(t *testing.T) {
bb := cluster.NewBatcher()
bb := cluster.NewBatcher(false)
bb.Put("SET", "aa", 1)
bb.Put("SET", "aa", 2)
bb.Put("SET", "bb", 3)
Expand Down
15 changes: 12 additions & 3 deletions pkg/redis/client/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ type Options struct {
// cache and update cluster info, and execute all kinds of commands.
// Multiple goroutines may invoke methods on a cluster simutaneously.
type Cluster struct {
slots [kClusterSlots]*redisNode
nodes map[string]*redisNode
slots [kClusterSlots]*redisNode
nodes map[string]*redisNode
pipeline *batchPipeline

connTimeout time.Duration
readTimeout time.Duration
Expand Down Expand Up @@ -107,6 +108,10 @@ func NewCluster(options *Options) (*Cluster, error) {
logger: log.WithLogger(config.LogModuleName("[redis cluster] ")),
safeRand: util.NewSafeRand(time.Now().Unix()),
}
cluster.pipeline = &batchPipeline{
nodeConns: make(map[*redisNode]*redisConn),
cluster: cluster,
}

errList := make([]error, 0)
for i := range options.StartNodes {
Expand Down Expand Up @@ -258,7 +263,10 @@ func (cluster *Cluster) handleReply(node *redisNode, reply interface{}, cmd stri
panic("unreachable")
}

func (cluster *Cluster) NewBatcher() common.CmdBatcher {
func (cluster *Cluster) NewBatcher(pipeline bool) common.CmdBatcher {
if pipeline {
return cluster.pipeline.NewBatcher()
}
return cluster.NewBatch()
}

Expand All @@ -270,6 +278,7 @@ func (cluster *Cluster) Close() {
if cluster.closed.CompareAndSwap(false, true) {
close(cluster.closeCh)
close(cluster.updateList)
cluster.pipeline.Close()
for addr, node := range cluster.nodes {
node.shutdown()
delete(cluster.nodes, addr)
Expand Down
Loading
Loading