diff --git a/go/cmd/vtorc/main.go b/go/cmd/vtorc/main.go index c80f0573948..6c27c17a29d 100644 --- a/go/cmd/vtorc/main.go +++ b/go/cmd/vtorc/main.go @@ -106,6 +106,7 @@ func main() { servenv.ParseFlags("vtorc") servenv.Init() config.UpdateConfigValuesFromFlags() + inst.RegisterStats() log.Info("starting vtorc") if len(configFile) > 0 { diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 065124b4e39..7dd5c50eefa 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -19,6 +19,7 @@ package api import ( "encoding/json" "fmt" + "math" "reflect" "testing" "time" @@ -200,22 +201,59 @@ func TestAPIEndpoints(t *testing.T) { assert.Equal(t, 400, status, resp) assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp) - // Also verify that the metric for errant GTIDs is reporting the correct information. - _, resp, err = utils.MakeAPICall(t, vtorc, "/debug/vars") + // Also verify that we see the tablet in the errant GTIDs API call + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids") require.NoError(t, err) - resultMap := make(map[string]any) - err = json.Unmarshal([]byte(resp), &resultMap) + assert.Equal(t, 200, status, resp) + assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias)) + + // Check that filtering using keyspace and shard works + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?keyspace=ks&shard=0") + require.NoError(t, err) + assert.Equal(t, 200, status, resp) + assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias)) + + // Check that filtering using keyspace works + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?keyspace=ks") + require.NoError(t, err) + assert.Equal(t, 200, status, resp) + assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias)) + + // Check that filtering using keyspace and shard works + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?keyspace=ks&shard=80-") require.NoError(t, err) - errantGTIDMap := reflect.ValueOf(resultMap["ErrantGtidMap"]) - errantGtidTablets := errantGTIDMap.MapKeys() - require.Len(t, errantGtidTablets, 3) + assert.Equal(t, 200, status, resp) + assert.Equal(t, "null", resp) - errantGTIDinReplica := "" - for _, tabletKey := range errantGtidTablets { - if tabletKey.String() == replica.Alias { - errantGTIDinReplica = errantGTIDMap.MapIndex(tabletKey).Interface().(string) + // Check that filtering using just the shard fails + status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?shard=0") + require.NoError(t, err) + assert.Equal(t, 400, status, resp) + assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp) + + // Also verify that the metric for errant GTIDs is reporting the correct count. + waitForErrantGTIDCount(t, vtorc, 1) + }) +} + +func waitForErrantGTIDCount(t *testing.T, vtorc *cluster.VTOrcProcess, errantGTIDCountWanted int) { + timeout := time.After(15 * time.Second) + for { + select { + case <-timeout: + t.Fatalf("Timed out waiting for errant gtid count in the metrics to be %v", errantGTIDCountWanted) + return + default: + _, resp, err := utils.MakeAPICall(t, vtorc, "/debug/vars") + require.NoError(t, err) + resultMap := make(map[string]any) + err = json.Unmarshal([]byte(resp), &resultMap) + require.NoError(t, err) + errantGTIDTabletsCount := reflect.ValueOf(resultMap["ErrantGtidTabletCount"]) + if int(math.Round(errantGTIDTabletsCount.Float())) == errantGTIDCountWanted { + return } + time.Sleep(100 * time.Millisecond) } - require.NotEmpty(t, errantGTIDinReplica) - }) + } } diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 031e648c328..abe233bf96e 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -56,12 +56,6 @@ const ( var instanceReadChan = make(chan bool, backendDBConcurrency) var instanceWriteChan = make(chan bool, backendDBConcurrency) -var ( - // Mutex to protect the access of the following variable - errantGtidMapMu = sync.Mutex{} - errantGtidMap = make(map[string]string) -) - var forgetAliases *cache.Cache var accessDeniedCounter = metrics.NewCounter() @@ -81,11 +75,6 @@ func init() { _ = metrics.Register("instance.write", writeInstanceCounter) _ = writeBufferLatency.AddMany([]string{"wait", "write"}) writeBufferLatency.Start("wait") - stats.NewStringMapFuncWithMultiLabels("ErrantGtidMap", "Metric to track the errant GTIDs detected by VTOrc", []string{"TabletAlias"}, "ErrantGtid", func() map[string]string { - errantGtidMapMu.Lock() - defer errantGtidMapMu.Unlock() - return errantGtidMap - }) go initializeInstanceDao() } @@ -156,6 +145,14 @@ func logReadTopologyInstanceError(tabletAlias string, hint string, err error) er return fmt.Errorf(msg) } +// RegisterStats registers stats from the inst package +func RegisterStats() { + stats.NewGaugeFunc("ErrantGtidTabletCount", "Number of tablets with errant GTIDs", func() int64 { + instances, _ := ReadInstancesWithErrantGTIds("", "") + return int64(len(instances)) + }) +} + // ReadTopologyInstance collects information on the state of a MySQL // server and writes the result synchronously to the vtorc // backend. @@ -385,12 +382,6 @@ Cleanup: instance.GtidErrant, err = vitessmysql.Subtract(redactedExecutedGtidSet.String(), redactedPrimaryExecutedGtidSet.String()) } } - // update the errant gtid map - go func() { - errantGtidMapMu.Lock() - defer errantGtidMapMu.Unlock() - errantGtidMap[topoproto.TabletAliasString(tablet.Alias)] = instance.GtidErrant - }() } latency.Stop("instance") @@ -682,6 +673,18 @@ func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error) return readInstancesByCondition(condition, args, "") } +// ReadInstancesWithErrantGTIds reads all instances with errant GTIDs +func ReadInstancesWithErrantGTIds(keyspace string, shard string) ([]*Instance, error) { + condition := ` + keyspace LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) + and shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END) + and gtid_errant != '' + ` + + args := sqlutils.Args(keyspace, keyspace, shard, shard) + return readInstancesByCondition(condition, args, "") +} + // GetKeyspaceShardName gets the keyspace shard name for the given instance key func GetKeyspaceShardName(tabletAlias string) (keyspace string, shard string, err error) { query := ` diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index d6eae6d5da2..549389f91fe 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -326,6 +326,90 @@ func TestReadProblemInstances(t *testing.T) { } } +// TestReadInstancesWithErrantGTIds is used to test the functionality of ReadInstancesWithErrantGTIds and verify its failure modes and successes. +func TestReadInstancesWithErrantGTIds(t *testing.T) { + // The test is intended to be used as follows. The initial data is stored into the database. Following this, some specific queries are run that each individual test specifies to get the desired state. + tests := []struct { + name string + keyspace string + shard string + sql []string + instancesRequired []string + }{ + { + name: "No instances with errant GTID", + sql: nil, + instancesRequired: nil, + }, { + name: "errant GTID", + sql: []string{ + "update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'", + }, + instancesRequired: []string{"zone1-0000000112"}, + }, { + name: "keyspace filtering - success", + keyspace: "ks", + sql: []string{ + "update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'", + }, + instancesRequired: []string{"zone1-0000000112"}, + }, { + name: "keyspace filtering - failure", + keyspace: "unknown", + sql: []string{ + "update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'", + }, + instancesRequired: nil, + }, { + name: "shard filtering - success", + keyspace: "ks", + shard: "0", + sql: []string{ + "update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'", + }, + instancesRequired: []string{"zone1-0000000112"}, + }, { + name: "shard filtering - failure", + keyspace: "ks", + shard: "unknown", + sql: []string{ + "update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'", + }, + instancesRequired: nil, + }, + } + + // We need to set InstancePollSeconds to a large value otherwise all the instances are reported as having problems since their last_checked is very old. + // Setting this value to a hundred years, we ensure that this test doesn't fail with this issue for the next hundred years. + oldVal := config.Config.InstancePollSeconds + defer func() { + config.Config.InstancePollSeconds = oldVal + }() + config.Config.InstancePollSeconds = 60 * 60 * 24 * 365 * 100 + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Each test should clear the database. The easiest way to do that is to run all the initialization commands again + defer func() { + db.ClearVTOrcDatabase() + }() + + for _, query := range append(initialSQL, tt.sql...) { + _, err := db.ExecVTOrc(query) + require.NoError(t, err) + } + + instances, err := ReadInstancesWithErrantGTIds(tt.keyspace, tt.shard) + require.NoError(t, err) + var tabletAliases []string + for _, instance := range instances { + tabletAliases = append(tabletAliases, instance.InstanceAlias) + } + require.ElementsMatch(t, tabletAliases, tt.instancesRequired) + }) + } +} + // TestReadInstancesByCondition is used to test the functionality of readInstancesByCondition and verify its failure modes and successes. func TestReadInstancesByCondition(t *testing.T) { tests := []struct { diff --git a/go/vt/vtorc/server/api.go b/go/vt/vtorc/server/api.go index 66032d961ce..f053336e64e 100644 --- a/go/vt/vtorc/server/api.go +++ b/go/vt/vtorc/server/api.go @@ -41,6 +41,7 @@ type vtorcAPI struct{} const ( problemsAPI = "/api/problems" + errantGTIDsAPI = "/api/errant-gtids" disableGlobalRecoveriesAPI = "/api/disable-global-recoveries" enableGlobalRecoveriesAPI = "/api/enable-global-recoveries" replicationAnalysisAPI = "/api/replication-analysis" @@ -55,6 +56,7 @@ var ( apiHandler = &vtorcAPI{} vtorcAPIPaths = []string{ problemsAPI, + errantGTIDsAPI, disableGlobalRecoveriesAPI, enableGlobalRecoveriesAPI, replicationAnalysisAPI, @@ -80,6 +82,8 @@ func (v *vtorcAPI) ServeHTTP(response http.ResponseWriter, request *http.Request healthAPIHandler(response, request) case problemsAPI: problemsAPIHandler(response, request) + case errantGTIDsAPI: + errantGTIDsAPIHandler(response, request) case replicationAnalysisAPI: replicationAnalysisAPIHandler(response, request) case AggregatedDiscoveryMetricsAPI: @@ -94,7 +98,7 @@ func (v *vtorcAPI) ServeHTTP(response http.ResponseWriter, request *http.Request // getACLPermissionLevelForAPI returns the acl permission level that is required to run a given API func getACLPermissionLevelForAPI(apiEndpoint string) string { switch apiEndpoint { - case problemsAPI: + case problemsAPI, errantGTIDsAPI: return acl.MONITORING case disableGlobalRecoveriesAPI, enableGlobalRecoveriesAPI: return acl.ADMIN @@ -144,6 +148,24 @@ func problemsAPIHandler(response http.ResponseWriter, request *http.Request) { returnAsJSON(response, http.StatusOK, instances) } +// errantGTIDsAPIHandler is the handler for the errantGTIDsAPI endpoint +func errantGTIDsAPIHandler(response http.ResponseWriter, request *http.Request) { + // This api also supports filtering by shard and keyspace provided. + shard := request.URL.Query().Get("shard") + keyspace := request.URL.Query().Get("keyspace") + if shard != "" && keyspace == "" { + http.Error(response, shardWithoutKeyspaceFilteringErrorStr, http.StatusBadRequest) + return + } + + instances, err := inst.ReadInstancesWithErrantGTIds(keyspace, shard) + if err != nil { + http.Error(response, err.Error(), http.StatusInternalServerError) + return + } + returnAsJSON(response, http.StatusOK, instances) +} + // AggregatedDiscoveryMetricsAPIHandler is the handler for the discovery metrics endpoint func AggregatedDiscoveryMetricsAPIHandler(response http.ResponseWriter, request *http.Request) { // return metrics for last x seconds diff --git a/go/vt/vtorc/server/api_test.go b/go/vt/vtorc/server/api_test.go index 3c9b792afae..c352d1e600f 100644 --- a/go/vt/vtorc/server/api_test.go +++ b/go/vt/vtorc/server/api_test.go @@ -16,6 +16,9 @@ func TestGetACLPermissionLevelForAPI(t *testing.T) { { apiEndpoint: problemsAPI, want: acl.MONITORING, + }, { + apiEndpoint: errantGTIDsAPI, + want: acl.MONITORING, }, { apiEndpoint: disableGlobalRecoveriesAPI, want: acl.ADMIN,