diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 7ff9d6b860f..1a333af253b 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -1089,10 +1089,18 @@ func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backup return backups } +func GetTabletPosition(t *testing.T, tablet *cluster.Vttablet) string { + pos, _ := cluster.GetPrimaryPosition(t, *tablet, hostname) + return pos +} + func GetReplicaPosition(t *testing.T, replicaIndex int) string { replica := getReplica(t, replicaIndex) - pos, _ := cluster.GetPrimaryPosition(t, *replica, hostname) - return pos + return GetTabletPosition(t, replica) +} + +func GetPrimaryPosition(t *testing.T) string { + return GetTabletPosition(t, primary) } func GetReplicaGtidPurged(t *testing.T, replicaIndex int) string { @@ -1147,16 +1155,26 @@ func ReadRowsFromReplica(t *testing.T, replicaIndex int) (msgs []string) { return ReadRowsFromTablet(t, getReplica(t, replicaIndex)) } -// FlushBinaryLogsOnReplica issues `FLUSH BINARY LOGS` times -func FlushBinaryLogsOnReplica(t *testing.T, replicaIndex int, count int) { - replica := getReplica(t, replicaIndex) +// FlushBinaryLogsOnTablet issues `FLUSH BINARY LOGS` times +func FlushBinaryLogsOnTablet(t *testing.T, tablet *cluster.Vttablet, count int) { query := "flush binary logs" for i := 0; i < count; i++ { - _, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true) + _, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) require.NoError(t, err) } } +// FlushBinaryLogsOnReplica issues `FLUSH BINARY LOGS` times +func FlushBinaryLogsOnReplica(t *testing.T, replicaIndex int, count int) { + replica := getReplica(t, replicaIndex) + FlushBinaryLogsOnTablet(t, replica, count) +} + +// FlushBinaryLogsOnPrimary issues `FLUSH BINARY LOGS` times +func FlushBinaryLogsOnPrimary(t *testing.T, count int) { + FlushBinaryLogsOnTablet(t, primary, count) +} + // FlushAndPurgeBinaryLogsOnReplica intentionally loses all existing binary logs. It flushes into a new binary log // and immediately purges all previous logs. // This is used to lose information. diff --git a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go index ddeb43c7dd7..860d0b54c7a 100644 --- a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go +++ b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go @@ -27,7 +27,6 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/mysqlctl" ) @@ -61,10 +60,10 @@ type testedBackupTimestampInfo struct { func waitForReplica(t *testing.T, replicaIndex int) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - pMsgs := ReadRowsFromPrimary(t) for { - rMsgs := ReadRowsFromReplica(t, replicaIndex) - if len(pMsgs) == len(rMsgs) { + primaryPos := GetPrimaryPosition(t) + replicaPos := GetReplicaPosition(t, replicaIndex) + if replicaPos == primaryPos { // success return } @@ -170,10 +169,10 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) if tc.writeBeforeBackup { InsertRowOnPrimary(t, "") } - // we wait for 1 second because backups are written to a directory named after the current timestamp, + // we wait for >1 second because backups are written to a directory named after the current timestamp, // in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this // is only ever a problem in this end-to-end test, not in production. - // Also, we gie the replica a chance to catch up. + // Also, we give the replica a chance to catch up. time.Sleep(postWriteSleepDuration) // randomly flush binary logs 0, 1 or 2 times FlushBinaryLogsOnReplica(t, 0, rand.Intn(3))