diff --git a/tests/gocase/go.mod b/tests/gocase/go.mod index 21fb651f4e2..aa66422603d 100644 --- a/tests/gocase/go.mod +++ b/tests/gocase/go.mod @@ -23,6 +23,7 @@ require ( github.com/tklauser/go-sysconf v0.3.14 // indirect github.com/tklauser/numcpus v0.9.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + golang.org/x/sync v0.10.0 golang.org/x/sys v0.27.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/tests/gocase/go.sum b/tests/gocase/go.sum index 19ce5b37534..b153085f615 100644 --- a/tests/gocase/go.sum +++ b/tests/gocase/go.sum @@ -29,8 +29,6 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU= @@ -39,10 +37,10 @@ github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPD github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY= -golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo= golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/tests/gocase/integration/replication/replication_test.go b/tests/gocase/integration/replication/replication_test.go index bb88f0220d0..ac01e1b9853 100644 --- a/tests/gocase/integration/replication/replication_test.go +++ b/tests/gocase/integration/replication/replication_test.go @@ -552,3 +552,60 @@ func TestFullSyncReplication(t *testing.T) { require.Equal(t, "bar", slaveClient.Get(ctx, "foo").Val()) }) } + +func TestSlaveLostMaster(t *testing.T) { + // integration test for #2662 and #2671 + ctx := context.Background() + + masterSrv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "max-replication-mb": "1", + "rocksdb.compression": "no", + "rocksdb.write_buffer_size": "1", + "rocksdb.target_file_size_base": "1", + }) + defer func() { masterSrv.Close() }() + masterClient := masterSrv.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterNodeID).Err()) + + replicaSrv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "replication-connect-timeout-ms": "5000", + "replication-recv-timeout-ms": "5100", + }) + defer func() { replicaSrv.Close() }() + replicaClient := replicaSrv.NewClient() + // allow to run the read-only command in the replica + require.NoError(t, replicaClient.ReadOnly(ctx).Err()) + defer func() { require.NoError(t, replicaClient.Close()) }() + replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID", replicaNodeID).Err()) + + proxyCtx, cancelProxy := context.WithCancel(ctx) + newMasterPort := util.SimpleTCPProxy(proxyCtx, t, fmt.Sprintf("127.0.0.1:%d", masterSrv.Port()), true) + + masterNodesInfo := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383\n%s 127.0.0.1 %d slave %s", + masterNodeID, masterSrv.Port(), replicaNodeID, replicaSrv.Port(), masterNodeID) + clusterNodesInfo := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383\n%s 127.0.0.1 %d slave %s", + masterNodeID, newMasterPort, replicaNodeID, replicaSrv.Port(), masterNodeID) + unexistNodesInfo := fmt.Sprintf("%s 127.0.0.2 %d master - 0-16383\n%s 127.0.0.1 %d slave %s", + masterNodeID, newMasterPort, replicaNodeID, replicaSrv.Port(), masterNodeID) + + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", masterNodesInfo, "1").Err()) + value := strings.Repeat("a", 128*1024) + + for i := 0; i < 1024; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), value, 0).Err()) + } + + require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", clusterNodesInfo, "1").Err()) + + time.Sleep(2 * time.Second) + cancelProxy() + start := time.Now() + require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", unexistNodesInfo, "2").Err()) + duration := time.Since(start) + require.Less(t, duration, time.Second*6) +} diff --git a/tests/gocase/util/client.go b/tests/gocase/util/client.go index 6d4fb5bbf3a..27961c9d7d4 100644 --- a/tests/gocase/util/client.go +++ b/tests/gocase/util/client.go @@ -21,7 +21,10 @@ package util import ( "context" + "errors" "fmt" + "io" + "net" "regexp" "strings" "testing" @@ -29,6 +32,7 @@ import ( "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func FindInfoEntry(rdb *redis.Client, key string, section ...string) string { @@ -71,3 +75,73 @@ func Populate(t testing.TB, rdb *redis.Client, prefix string, n, size int) { _, err := p.Exec(ctx) require.NoError(t, err) } + +func SimpleTCPProxy(ctx context.Context, t testing.TB, to string, slowdown bool) uint64 { + addr, err := findFreePort() + if err != nil { + t.Fatalf("can't find a free port, %v", err) + } + from := addr.String() + + listener, err := net.Listen("tcp", from) + if err != nil { + t.Fatalf("listen to %s failed, err: %v", from, err) + } + + copyBytes := func(src, dest io.ReadWriter) func() error { + buffer := make([]byte, 4096) + return func() error { + COPY_LOOP: + for { + select { + case <-ctx.Done(): + t.Log("forwarding tcp stream stopped") + break COPY_LOOP + default: + if slowdown { + time.Sleep(time.Millisecond * 100) + } + n, err := src.Read(buffer) + if err != nil && !errors.Is(err, io.EOF) { + return err + } + _, err = dest.Write(buffer[:n]) + if err != nil && !errors.Is(err, io.EOF) { + return err + } + } + } + return nil + } + } + + go func() { + defer listener.Close() + LISTEN_LOOP: + for { + select { + case <-ctx.Done(): + break LISTEN_LOOP + + default: + conn, err := listener.Accept() + if err != nil { + t.Fatalf("accept conn failed, err: %v", err) + } + dest, err := net.Dial("tcp", to) + if err != nil { + t.Fatalf("accept conn failed, err: %v", err) + } + var errGrp errgroup.Group + errGrp.Go(copyBytes(conn, dest)) + errGrp.Go(copyBytes(dest, conn)) + err = errGrp.Wait() + if err != nil { + t.Fatalf("forward tcp stream failed, err: %v", err) + } + + } + } + }() + return uint64(addr.Port) +}