diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index 0f6d41dc1ab..a2527d137a5 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -33,6 +33,18 @@ import ( "golang.org/x/exp/slices" ) +type SlotMigrationState string +type SlotImportState string + +const ( + SlotMigrationStateStarted SlotMigrationState = "start" + SlotMigrationStateSuccess SlotMigrationState = "success" + SlotMigrationStateFailed SlotMigrationState = "fail" + + SlotImportStateSuccess SlotImportState = "success" + SlotImportStateFailed SlotImportState = "error" +) + func TestSlotMigrateFromSlave(t *testing.T) { ctx := context.Background() @@ -56,11 +68,11 @@ func TestSlotMigrateFromSlave(t *testing.T) { require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) t.Run("MIGRATE - Slave cannot migrate slot", func(t *testing.T) { - require.ErrorContains(t, slaveClient.Do(ctx, "clusterx", "migrate", "1", masterID).Err(), "Can't migrate slot") + require.ErrorContains(t, slaveClient.Do(ctx, "clusterx", "migrate", 1, masterID).Err(), "Can't migrate slot") }) t.Run("MIGRATE - Cannot migrate slot to a slave", func(t *testing.T) { - require.ErrorContains(t, masterClient.Do(ctx, "clusterx", "migrate", "1", slaveID).Err(), "Can't migrate slot to a slave") + require.ErrorContains(t, masterClient.Do(ctx, "clusterx", "migrate", 1, slaveID).Err(), "Can't migrate slot to a slave") }) } @@ -92,20 +104,21 @@ func TestSlotMigrateDestServerKilled(t *testing.T) { require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) t.Run("MIGRATE - Slot is out of range", func(t *testing.T) { - require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", "-1", id1).Err(), "Slot is out of range") - require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", "16384", id1).Err(), "Slot is out of range") + require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", -1, id1).Err(), "Slot is out of range") + require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 16384, id1).Err(), "Slot is out of range") }) t.Run("MIGRATE - Cannot migrate slot to itself", func(t *testing.T) { - require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", "1", id0).Err(), "Can't migrate slot to myself") + require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", 1, id0).Err(), "Can't migrate slot to myself") }) t.Run("MIGRATE - Fail to migrate slot if destination server is not running", func(t *testing.T) { + slot := 1 srv1.Close() srv1Alive = false - require.NoError(t, rdb0.Do(ctx, "clusterx", "migrate", "1", id1).Err()) + require.NoError(t, rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Err()) time.Sleep(50 * time.Millisecond) - requireMigrateState(t, rdb0, "1", "fail") + requireMigrateState(t, rdb0, slot, SlotMigrationStateFailed) }) } @@ -137,52 +150,56 @@ func TestSlotMigrateDestServerKilledAgain(t *testing.T) { require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) t.Run("MIGRATE - Migrate slot with empty string key or value", func(t *testing.T) { + slot := 0 require.NoError(t, rdb0.Set(ctx, "", "slot0", 0).Err()) - require.NoError(t, rdb0.Del(ctx, util.SlotTable[0]).Err()) - require.NoError(t, rdb0.Set(ctx, util.SlotTable[0], "", 0).Err()) + require.NoError(t, rdb0.Del(ctx, util.SlotTable[slot]).Err()) + require.NoError(t, rdb0.Set(ctx, util.SlotTable[slot], "", 0).Err()) time.Sleep(500 * time.Millisecond) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "0", id1).Val()) - waitForMigrateState(t, rdb0, "0", "success") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) require.Equal(t, "slot0", rdb1.Get(ctx, "").Val()) - require.Equal(t, "", rdb1.Get(ctx, util.SlotTable[0]).Val()) - require.NoError(t, rdb1.Del(ctx, util.SlotTable[0]).Err()) + require.Equal(t, "", rdb1.Get(ctx, util.SlotTable[slot]).Val()) + require.NoError(t, rdb1.Del(ctx, util.SlotTable[slot]).Err()) }) t.Run("MIGRATE - Migrate binary key-value", func(t *testing.T) { - k1 := fmt.Sprintf("\x3a\x88{%s}\x3d\xaa", util.SlotTable[1]) + slot := 1 + k1 := fmt.Sprintf("\x3a\x88{%s}\x3d\xaa", util.SlotTable[slot]) cnt := 257 for i := 0; i < cnt; i++ { require.NoError(t, rdb0.LPush(ctx, k1, "\0000\0001").Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "1", id1).Val()) - k2 := fmt.Sprintf("\x49\x1f\x7f{%s}\xaf", util.SlotTable[1]) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + k2 := fmt.Sprintf("\x49\x1f\x7f{%s}\xaf", util.SlotTable[slot]) require.NoError(t, rdb0.Set(ctx, k2, "\0000\0001", 0).Err()) time.Sleep(time.Second) - waitForImportState(t, rdb1, "1", "success") + waitForImportState(t, rdb1, slot, SlotImportStateSuccess) require.EqualValues(t, cnt, rdb1.LLen(ctx, k1).Val()) require.Equal(t, "\0000\0001", rdb1.LPop(ctx, k1).Val()) require.Equal(t, "\0000\0001", rdb1.Get(ctx, k2).Val()) }) t.Run("MIGRATE - Migrate empty slot", func(t *testing.T) { + slot := 2 require.NoError(t, rdb0.FlushDB(ctx).Err()) require.NoError(t, rdb1.FlushDB(ctx).Err()) time.Sleep(500 * time.Millisecond) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "2", id1).Val()) - waitForMigrateState(t, rdb0, "2", "success") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) require.NoError(t, rdb1.Keys(ctx, "*").Err()) }) t.Run("MIGRATE - Fail to migrate slot because destination server is killed while migrating", func(t *testing.T) { + slot := 8 for i := 0; i < 20000; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[8], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "8", id1).Val()) - requireMigrateState(t, rdb0, "8", "start") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + requireMigrateState(t, rdb0, slot, SlotMigrationStateStarted) srv1.Close() srv1Alive = false time.Sleep(time.Second) - requireMigrateState(t, rdb0, "8", "fail") + requireMigrateState(t, rdb0, slot, SlotMigrationStateFailed) }) } @@ -214,33 +231,35 @@ func TestSlotMigrateSourceServerFlushedOrKilled(t *testing.T) { require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) t.Run("MIGRATE - Fail to migrate slot because source server is flushed", func(t *testing.T) { + slot := 11 for i := 0; i < 20000; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[11], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) } require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "32").Err()) require.Equal(t, map[string]string{"migrate-speed": "32"}, rdb0.ConfigGet(ctx, "migrate-speed").Val()) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "11", id1).Val()) - waitForMigrateState(t, rdb0, "11", "start") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateStarted) require.NoError(t, rdb0.FlushDB(ctx).Err()) time.Sleep(time.Second) - waitForMigrateState(t, rdb0, "11", "fail") + waitForMigrateState(t, rdb0, slot, SlotMigrationStateFailed) }) t.Run("MIGRATE - Fail to migrate slot because source server is killed while migrating", func(t *testing.T) { + slot := 20 for i := 0; i < 20000; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[20], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) } require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "32").Err()) require.Equal(t, map[string]string{"migrate-speed": "32"}, rdb0.ConfigGet(ctx, "migrate-speed").Val()) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "20", id1).Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) require.Eventually(t, func() bool { - return slices.Contains(rdb1.Keys(ctx, "*").Val(), util.SlotTable[20]) + return slices.Contains(rdb1.Keys(ctx, "*").Val(), util.SlotTable[slot]) }, 5*time.Second, 100*time.Millisecond) srv0.Close() srv0Alive = false time.Sleep(100 * time.Millisecond) - require.NotContains(t, rdb1.Keys(ctx, "*").Val(), util.SlotTable[20]) + require.NotContains(t, rdb1.Keys(ctx, "*").Val(), util.SlotTable[slot]) }) } @@ -267,18 +286,19 @@ func TestSlotMigrateNewNodeAndAuth(t *testing.T) { require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) t.Run("MIGRATE - Migrate slot to newly added node", func(t *testing.T) { - require.NoError(t, rdb0.Del(ctx, util.SlotTable[21]).Err()) - require.ErrorContains(t, rdb1.Set(ctx, util.SlotTable[21], "foobar", 0).Err(), "MOVED") + slot := 21 + require.NoError(t, rdb0.Del(ctx, util.SlotTable[slot]).Err()) + require.ErrorContains(t, rdb1.Set(ctx, util.SlotTable[slot], "foobar", 0).Err(), "MOVED") cnt := 100 for i := 0; i < cnt; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[21], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "21", id1).Val()) - waitForMigrateState(t, rdb0, "21", "success") - require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[21]).Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[slot]).Val()) - k := fmt.Sprintf("{%s}_1", util.SlotTable[21]) + k := fmt.Sprintf("{%s}_1", util.SlotTable[slot]) require.ErrorContains(t, rdb0.Set(ctx, k, "slot21_value1", 0).Err(), "MOVED") require.Equal(t, "OK", rdb1.Set(ctx, k, "slot21_value1", 0).Val()) }) @@ -286,27 +306,28 @@ func TestSlotMigrateNewNodeAndAuth(t *testing.T) { t.Run("MIGRATE - Auth before migrating slot", func(t *testing.T) { require.NoError(t, rdb1.ConfigSet(ctx, "requirepass", "password").Err()) cnt := 100 + slot := 22 for i := 0; i < cnt; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[22], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) } // migrating slot will fail if no auth - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "22", id1).Val()) - waitForMigrateState(t, rdb0, "22", "fail") - require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[22]).Err(), "MOVED") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateFailed) + require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[slot]).Err(), "MOVED") // migrating slot will fail if auth with wrong password require.NoError(t, rdb0.ConfigSet(ctx, "requirepass", "pass").Err()) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "22", id1).Val()) - waitForMigrateState(t, rdb0, "22", "fail") - require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[22]).Err(), "MOVED") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateFailed) + require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[slot]).Err(), "MOVED") // migrating slot will succeed if auth with right password require.NoError(t, rdb0.ConfigSet(ctx, "requirepass", "password").Err()) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "22", id1).Val()) - waitForMigrateState(t, rdb0, "22", "success") - require.EqualValues(t, 1, rdb1.Exists(ctx, util.SlotTable[21]).Val()) - require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[22]).Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.EqualValues(t, 1, rdb1.Exists(ctx, util.SlotTable[slot]).Val()) + require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[slot]).Val()) }) } @@ -342,11 +363,12 @@ func TestSlotMigrateThreeNodes(t *testing.T) { require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) t.Run("MIGRATE - Fail to migrate slot because source server is changed to slave during migrating", func(t *testing.T) { + slot := 10 for i := 0; i < 10000; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[10], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "10", id2).Val()) - requireMigrateState(t, rdb0, "10", "start") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id2).Val()) + requireMigrateState(t, rdb0, slot, SlotMigrationStateStarted) // change source server to slave by set topology clusterNodes := fmt.Sprintf("%s %s %d master - 0-10000\n", id1, srv1.Host(), srv1.Port()) @@ -358,7 +380,7 @@ func TestSlotMigrateThreeNodes(t *testing.T) { time.Sleep(time.Second) // check destination importing status - requireImportState(t, rdb2, "10", "error") + requireImportState(t, rdb2, slot, SlotImportStateFailed) }) } @@ -386,28 +408,31 @@ func TestSlotMigrateDataType(t *testing.T) { t.Run("MIGRATE - Cannot migrate two slot at the same time", func(t *testing.T) { cnt := 20000 + slot := 0 for i := 0; i < cnt; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[0], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "0", id1).Val()) - require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", "2", id1).Err(), "There is already a migrating slot") - waitForMigrateState(t, rdb0, "0", "success") - require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[0]).Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + otherSlot := 2 + require.ErrorContains(t, rdb0.Do(ctx, "clusterx", "migrate", otherSlot, id1).Err(), "There is already a migrating slot") + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[slot]).Val()) }) t.Run("MIGRATE - Slot migrate all types of existing data", func(t *testing.T) { + slot := 1 keys := make(map[string]string, 0) - for _, typ := range []string{"string", "string2", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} { - keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[1]) + for _, typ := range []string{"string", "expired_string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} { + keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[slot]) require.NoError(t, rdb0.Del(ctx, keys[typ]).Err()) } // type string require.NoError(t, rdb0.Set(ctx, keys["string"], keys["string"], 0).Err()) require.NoError(t, rdb0.Expire(ctx, keys["string"], 10*time.Second).Err()) // type expired string - require.NoError(t, rdb0.Set(ctx, keys["string2"], keys["string2"], time.Second).Err()) + require.NoError(t, rdb0.Set(ctx, keys["expired_string"], keys["expired_string"], time.Second).Err()) time.Sleep(3 * time.Second) - require.Empty(t, rdb0.Get(ctx, keys["string2"]).Val()) + require.Empty(t, rdb0.Get(ctx, keys["expired_string"]).Val()) // type list require.NoError(t, rdb0.RPush(ctx, keys["list"], 0, 1, 2, 3, 4, 5).Err()) require.NoError(t, rdb0.LPush(ctx, keys["list"], 9, 3, 7, 3, 5, 4).Err()) @@ -470,14 +495,14 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, 19, streamInfo.Length) // migrate slot 1, all keys above are belong to slot 1 - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "1", id1).Val()) - waitForMigrateState(t, rdb0, "1", "success") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) // check destination data // type string require.Equal(t, keys["string"], rdb1.Get(ctx, keys["string"]).Val()) util.BetweenValues(t, rdb1.TTL(ctx, keys["string"]).Val(), time.Second, 10*time.Second) - require.Empty(t, rdb1.Get(ctx, keys["string2"]).Val()) + require.Empty(t, rdb1.Get(ctx, keys["expired_string"]).Val()) // type list require.EqualValues(t, lv, rdb1.LRange(ctx, keys["list"], 0, -1).Val()) util.BetweenValues(t, rdb1.TTL(ctx, keys["list"]).Val(), time.Second, 10*time.Second) @@ -519,8 +544,8 @@ func TestSlotMigrateDataType(t *testing.T) { }) t.Run("MIGRATE - Migrating empty stream", func(t *testing.T) { - slotID := 31 - key := fmt.Sprintf("stream_{%s}", util.SlotTable[slotID]) + slot := 31 + key := fmt.Sprintf("stream_{%s}", util.SlotTable[slot]) require.NoError(t, rdb0.Del(ctx, key).Err()) @@ -545,8 +570,8 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, "7-0", originRes.MaxDeletedEntryID) require.EqualValues(t, 0, originRes.Length) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slotID, id1).Val()) - waitForMigrateState(t, rdb0, strconv.FormatInt(int64(slotID), 10), "success") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) require.ErrorContains(t, rdb0.Exists(ctx, key).Err(), "MOVED") @@ -560,8 +585,8 @@ func TestSlotMigrateDataType(t *testing.T) { }) t.Run("MIGRATE - Migrating stream with deleted entries", func(t *testing.T) { - slotID := 32 - key := fmt.Sprintf("stream_{%s}", util.SlotTable[slotID]) + slot := 32 + key := fmt.Sprintf("stream_{%s}", util.SlotTable[slot]) require.NoError(t, rdb0.Del(ctx, key).Err()) @@ -584,8 +609,8 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, "3-0", originRes.MaxDeletedEntryID) require.EqualValues(t, 3, originRes.Length) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slotID, id1).Val()) - waitForMigrateState(t, rdb0, strconv.FormatInt(int64(slotID), 10), "success") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) require.ErrorContains(t, rdb0.Exists(ctx, key).Err(), "MOVED") @@ -599,43 +624,47 @@ func TestSlotMigrateDataType(t *testing.T) { }) t.Run("MIGRATE - Accessing slot is forbidden on source server but not on destination server", func(t *testing.T) { - require.NoError(t, rdb0.Set(ctx, util.SlotTable[3], 3, 0).Err()) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "3", id1).Val()) - waitForMigrateState(t, rdb0, "3", "success") - require.ErrorContains(t, rdb0.Set(ctx, util.SlotTable[3], "slot3", 0).Err(), "MOVED") - require.ErrorContains(t, rdb0.Del(ctx, util.SlotTable[3]).Err(), "MOVED") - require.ErrorContains(t, rdb0.Exists(ctx, util.SlotTable[3]).Err(), "MOVED") - require.NoError(t, rdb0.Set(ctx, util.SlotTable[4], "slot4", 0).Err()) + slot := 3 + require.NoError(t, rdb0.Set(ctx, util.SlotTable[slot], 3, 0).Err()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.ErrorContains(t, rdb0.Set(ctx, util.SlotTable[slot], "source-value", 0).Err(), "MOVED") + require.ErrorContains(t, rdb0.Del(ctx, util.SlotTable[slot]).Err(), "MOVED") + require.ErrorContains(t, rdb0.Exists(ctx, util.SlotTable[slot]).Err(), "MOVED") + require.NoError(t, rdb1.Set(ctx, util.SlotTable[slot], "destination-value", 0).Err()) }) t.Run("MIGRATE - Slot isn't forbidden writing when starting migrating", func(t *testing.T) { + slot := 5 cnt := 20000 for i := 0; i < cnt; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[5], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "5", id1).Val()) - requireMigrateState(t, rdb0, "5", "start") + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + requireMigrateState(t, rdb0, slot, SlotMigrationStateStarted) // write during migrating - require.EqualValues(t, cnt+1, rdb0.LPush(ctx, util.SlotTable[5], cnt).Val()) - waitForMigrateState(t, rdb0, "5", "success") - require.Equal(t, strconv.Itoa(cnt), rdb1.LPop(ctx, util.SlotTable[5]).Val()) + require.EqualValues(t, cnt+1, rdb0.LPush(ctx, util.SlotTable[slot], cnt).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.Equal(t, strconv.Itoa(cnt), rdb1.LPop(ctx, util.SlotTable[slot]).Val()) }) t.Run("MIGRATE - Slot keys are not cleared after migration but cleared after setslot", func(t *testing.T) { - require.NoError(t, rdb0.Set(ctx, util.SlotTable[6], "slot6", 0).Err()) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "6", id1).Val()) - waitForMigrateState(t, rdb0, "6", "success") - require.Equal(t, "slot6", rdb1.Get(ctx, util.SlotTable[6]).Val()) - require.Contains(t, rdb0.Keys(ctx, "*").Val(), util.SlotTable[6]) - require.NoError(t, rdb0.Do(ctx, "clusterx", "setslot", "6", "node", id1, "2").Err()) - require.NotContains(t, rdb0.Keys(ctx, "*").Val(), util.SlotTable[6]) + slot := 6 + require.NoError(t, rdb0.Set(ctx, util.SlotTable[slot], "slot6", 0).Err()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.Equal(t, "slot6", rdb1.Get(ctx, util.SlotTable[slot]).Val()) + require.Contains(t, rdb0.Keys(ctx, "*").Val(), util.SlotTable[slot]) + require.NoError(t, rdb0.Do(ctx, "clusterx", "setslot", slot, "node", id1, "2").Err()) + require.NotContains(t, rdb0.Keys(ctx, "*").Val(), util.SlotTable[slot]) }) t.Run("MIGRATE - Migrate incremental data via parsing and filtering data in WAL", func(t *testing.T) { + migratingSlot := 15 keys := []string{ - // slot15 key for slowing migrate-speed when migrating existing data - util.SlotTable[15], - // slot15 all types keys string/hash/set/zset/list/sortint + // key for slowing migrate-speed when migrating existing data + util.SlotTable[migratingSlot], + // the following keys belong to slot 15; keys of all the data types (string/hash/set/zset/list/sortint) "key:000042915392", "key:000043146202", "key:000044434182", @@ -656,11 +685,12 @@ func TestSlotMigrateDataType(t *testing.T) { for i := 0; i < cnt; i++ { require.NoError(t, rdb0.LPush(ctx, keys[0], i).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "15", id1).Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", migratingSlot, id1).Val()) // write key that doesn't belong to this slot - require.NoError(t, rdb0.Del(ctx, util.SlotTable[12]).Err()) - require.NoError(t, rdb0.Set(ctx, util.SlotTable[12], "slot12", 0).Err()) + nonMigratingSlot := 12 + require.NoError(t, rdb0.Del(ctx, util.SlotTable[nonMigratingSlot]).Err()) + require.NoError(t, rdb0.Set(ctx, util.SlotTable[nonMigratingSlot], "non-migrating-value", 0).Err()) // write increment operations include all kinds of types // 1. type string @@ -671,13 +701,15 @@ func TestSlotMigrateDataType(t *testing.T) { require.NoError(t, rdb0.Del(ctx, keys[2]).Err()) require.NoError(t, rdb0.SetBit(ctx, keys[3], 10086, 1).Err()) require.NoError(t, rdb0.Expire(ctx, keys[3], 10000*time.Second).Err()) - require.NoError(t, rdb0.Del(ctx, util.SlotTable[13]).Err()) // verify expireat binlog could be parsed - require.NoError(t, rdb0.Set(ctx, util.SlotTable[13], "slot13", 0).Err()) - require.NoError(t, rdb0.ExpireAt(ctx, util.SlotTable[13], time.Now().Add(100*time.Second)).Err()) + slotWithExpiringKey := nonMigratingSlot + 1 + require.NoError(t, rdb0.Del(ctx, util.SlotTable[slotWithExpiringKey]).Err()) + require.NoError(t, rdb0.Set(ctx, util.SlotTable[slotWithExpiringKey], "expiring-value", 0).Err()) + require.NoError(t, rdb0.ExpireAt(ctx, util.SlotTable[slotWithExpiringKey], time.Now().Add(100*time.Second)).Err()) // verify del command - require.NoError(t, rdb0.Set(ctx, util.SlotTable[14], "slot14", 0).Err()) - require.NoError(t, rdb0.Del(ctx, util.SlotTable[14]).Err()) + slotWithDeletedKey := nonMigratingSlot + 2 + require.NoError(t, rdb0.Set(ctx, util.SlotTable[slotWithDeletedKey], "will-be-deleted", 0).Err()) + require.NoError(t, rdb0.Del(ctx, util.SlotTable[slotWithDeletedKey]).Err()) // 2. type hash require.NoError(t, rdb0.HMSet(ctx, keys[4], "f1", "1", "f2", "2").Err()) require.NoError(t, rdb0.HDel(ctx, keys[4], "f1").Err()) @@ -728,8 +760,8 @@ func TestSlotMigrateDataType(t *testing.T) { zv := rdb0.ZRangeWithScores(ctx, keys[6], 0, -1).Val() lv := rdb0.LRange(ctx, keys[7], 0, -1).Val() siv := rdb0.Do(ctx, "SIRANGE", keys[9], 0, -1).Val() - waitForMigrateStateInDuration(t, rdb0, "15", "success", time.Minute) - waitForImportState(t, rdb1, "15", "success") + waitForMigrateStateInDuration(t, rdb0, migratingSlot, SlotMigrationStateSuccess, time.Minute) + waitForImportState(t, rdb1, migratingSlot, SlotImportStateSuccess) // check if the data is consistent // 1. type string require.EqualValues(t, cnt, rdb1.LLen(ctx, keys[0]).Val()) @@ -738,7 +770,7 @@ func TestSlotMigrateDataType(t *testing.T) { require.Empty(t, rdb1.Get(ctx, keys[2]).Val()) require.EqualValues(t, bv, rdb1.GetBit(ctx, keys[3], 10086).Val()) require.Less(t, rdb1.TTL(ctx, keys[3]).Val()-bt, 100*time.Second) - require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[13]).Err(), "MOVED") + require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[slotWithExpiringKey]).Err(), "MOVED") // 2. type hash require.EqualValues(t, hv, rdb1.HGetAll(ctx, keys[4]).Val()) require.EqualValues(t, "3", rdb1.HGet(ctx, keys[4], "f2").Val()) @@ -762,73 +794,75 @@ func TestSlotMigrateDataType(t *testing.T) { // 7. type sortint require.EqualValues(t, siv, rdb1.Do(ctx, "SIRANGE", keys[9], 0, -1).Val()) - // not migrate if the key doesn't belong to slot 1 - require.Equal(t, "slot12", rdb0.Get(ctx, util.SlotTable[12]).Val()) - require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[12]).Err(), "MOVED") - require.EqualValues(t, 0, rdb0.Exists(ctx, util.SlotTable[14]).Val()) + // not migrate if the key doesn't belong to slot + require.Equal(t, "non-migrating-value", rdb0.Get(ctx, util.SlotTable[nonMigratingSlot]).Val()) + require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[nonMigratingSlot]).Err(), "MOVED") + require.EqualValues(t, 0, rdb0.Exists(ctx, util.SlotTable[slotWithDeletedKey]).Val()) }) t.Run("MIGRATE - Slow migrate speed", func(t *testing.T) { + slot := 16 require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "16").Err()) require.Equal(t, map[string]string{"migrate-speed": "16"}, rdb0.ConfigGet(ctx, "migrate-speed").Val()) - require.NoError(t, rdb0.Del(ctx, util.SlotTable[16]).Err()) + require.NoError(t, rdb0.Del(ctx, util.SlotTable[slot]).Err()) // more than pipeline size(16) and max items(16) in command cnt := 1000 for i := 0; i < cnt; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[16], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "16", id1).Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) // should not finish 1.5s time.Sleep(1500 * time.Millisecond) - requireMigrateState(t, rdb0, "16", "start") - waitForMigrateState(t, rdb0, "16", "success") + requireMigrateState(t, rdb0, slot, SlotMigrationStateStarted) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) }) t.Run("MIGRATE - Data of migrated slot can't be written to source but can be written to destination", func(t *testing.T) { - require.NoError(t, rdb0.Del(ctx, util.SlotTable[17]).Err()) + slot := 17 + require.NoError(t, rdb0.Del(ctx, util.SlotTable[slot]).Err()) cnt := 100 for i := 0; i < cnt; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[17], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", "17", id1).Val()) - waitForMigrateState(t, rdb0, "17", "success") - require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[17]).Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[slot]).Val()) // write the migrated slot to source server - k := fmt.Sprintf("{%s}_1", util.SlotTable[17]) + k := fmt.Sprintf("{%s}_1", util.SlotTable[slot]) require.ErrorContains(t, rdb0.Set(ctx, k, "slot17_value1", 0).Err(), "MOVED") // write the migrated slot to destination server require.NoError(t, rdb1.Set(ctx, k, "slot17_value1", 0).Err()) }) } -func waitForMigrateState(t testing.TB, client *redis.Client, n, state string) { - waitForMigrateStateInDuration(t, client, n, state, 5*time.Second) +func waitForMigrateState(t testing.TB, client *redis.Client, slot int, state SlotMigrationState) { + waitForMigrateStateInDuration(t, client, slot, state, 5*time.Second) } -func waitForMigrateStateInDuration(t testing.TB, client *redis.Client, n, state string, d time.Duration) { +func waitForMigrateStateInDuration(t testing.TB, client *redis.Client, slot int, state SlotMigrationState, d time.Duration) { require.Eventually(t, func() bool { i := client.ClusterInfo(context.Background()).Val() - return strings.Contains(i, fmt.Sprintf("migrating_slot: %s", n)) && + return strings.Contains(i, fmt.Sprintf("migrating_slot: %d", slot)) && strings.Contains(i, fmt.Sprintf("migrating_state: %s", state)) }, d, 100*time.Millisecond) } -func requireMigrateState(t testing.TB, client *redis.Client, n, state string) { +func requireMigrateState(t testing.TB, client *redis.Client, slot int, state SlotMigrationState) { i := client.ClusterInfo(context.Background()).Val() - require.Contains(t, i, fmt.Sprintf("migrating_slot: %s", n)) + require.Contains(t, i, fmt.Sprintf("migrating_slot: %d", slot)) require.Contains(t, i, fmt.Sprintf("migrating_state: %s", state)) } -func waitForImportState(t testing.TB, client *redis.Client, n, state string) { +func waitForImportState(t testing.TB, client *redis.Client, n int, state SlotImportState) { require.Eventually(t, func() bool { i := client.ClusterInfo(context.Background()).Val() - return strings.Contains(i, fmt.Sprintf("importing_slot: %s", n)) && + return strings.Contains(i, fmt.Sprintf("importing_slot: %d", n)) && strings.Contains(i, fmt.Sprintf("import_state: %s", state)) }, 5*time.Second, 100*time.Millisecond) } -func requireImportState(t testing.TB, client *redis.Client, n, state string) { +func requireImportState(t testing.TB, client *redis.Client, n int, state SlotImportState) { i := client.ClusterInfo(context.Background()).Val() - require.Contains(t, i, fmt.Sprintf("importing_slot: %s", n)) + require.Contains(t, i, fmt.Sprintf("importing_slot: %d", n)) require.Contains(t, i, fmt.Sprintf("import_state: %s", state)) }