Skip to content

Commit

Permalink
feat: replace errant gtid map metric with just a gauge and add an api…
Browse files Browse the repository at this point in the history
… instead

Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Jul 31, 2023
1 parent 700e93e commit 6a61459
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 31 deletions.
1 change: 1 addition & 0 deletions go/cmd/vtorc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func main() {
servenv.ParseFlags("vtorc")
servenv.Init()
config.UpdateConfigValuesFromFlags()
inst.RegisterStats()

log.Info("starting vtorc")
if len(configFile) > 0 {
Expand Down
64 changes: 51 additions & 13 deletions go/test/endtoend/vtorc/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package api
import (
"encoding/json"
"fmt"
"math"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -200,22 +201,59 @@ func TestAPIEndpoints(t *testing.T) {
assert.Equal(t, 400, status, resp)
assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp)

// Also verify that the metric for errant GTIDs is reporting the correct information.
_, resp, err = utils.MakeAPICall(t, vtorc, "/debug/vars")
// Also verify that we see the tablet in the errant GTIDs API call
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids")
require.NoError(t, err)
resultMap := make(map[string]any)
err = json.Unmarshal([]byte(resp), &resultMap)
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias))

// Check that filtering using keyspace and shard works
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?keyspace=ks&shard=0")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias))

// Check that filtering using keyspace works
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?keyspace=ks")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias))

// Check that filtering using keyspace and shard works
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?keyspace=ks&shard=80-")
require.NoError(t, err)
errantGTIDMap := reflect.ValueOf(resultMap["ErrantGtidMap"])
errantGtidTablets := errantGTIDMap.MapKeys()
require.Len(t, errantGtidTablets, 3)
assert.Equal(t, 200, status, resp)
assert.Equal(t, "null", resp)

errantGTIDinReplica := ""
for _, tabletKey := range errantGtidTablets {
if tabletKey.String() == replica.Alias {
errantGTIDinReplica = errantGTIDMap.MapIndex(tabletKey).Interface().(string)
// Check that filtering using just the shard fails
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?shard=0")
require.NoError(t, err)
assert.Equal(t, 400, status, resp)
assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp)

// Also verify that the metric for errant GTIDs is reporting the correct count.
waitForErrantGTIDCount(t, vtorc, 1)
})
}

func waitForErrantGTIDCount(t *testing.T, vtorc *cluster.VTOrcProcess, errantGTIDCountWanted int) {
timeout := time.After(15 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("Timed out waiting for errant gtid count in the metrics to be %v", errantGTIDCountWanted)
return
default:
_, resp, err := utils.MakeAPICall(t, vtorc, "/debug/vars")
require.NoError(t, err)
resultMap := make(map[string]any)
err = json.Unmarshal([]byte(resp), &resultMap)
require.NoError(t, err)
errantGTIDTabletsCount := reflect.ValueOf(resultMap["ErrantGtidTabletCount"])
if int(math.Round(errantGTIDTabletsCount.Float())) == errantGTIDCountWanted {
return
}
time.Sleep(100 * time.Millisecond)
}
require.NotEmpty(t, errantGTIDinReplica)
})
}
}
37 changes: 20 additions & 17 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ const (
var instanceReadChan = make(chan bool, backendDBConcurrency)
var instanceWriteChan = make(chan bool, backendDBConcurrency)

var (
// Mutex to protect the access of the following variable
errantGtidMapMu = sync.Mutex{}
errantGtidMap = make(map[string]string)
)

var forgetAliases *cache.Cache

var accessDeniedCounter = metrics.NewCounter()
Expand All @@ -81,11 +75,6 @@ func init() {
_ = metrics.Register("instance.write", writeInstanceCounter)
_ = writeBufferLatency.AddMany([]string{"wait", "write"})
writeBufferLatency.Start("wait")
stats.NewStringMapFuncWithMultiLabels("ErrantGtidMap", "Metric to track the errant GTIDs detected by VTOrc", []string{"TabletAlias"}, "ErrantGtid", func() map[string]string {
errantGtidMapMu.Lock()
defer errantGtidMapMu.Unlock()
return errantGtidMap
})

go initializeInstanceDao()
}
Expand Down Expand Up @@ -156,6 +145,14 @@ func logReadTopologyInstanceError(tabletAlias string, hint string, err error) er
return fmt.Errorf(msg)
}

// RegisterStats registers stats from the inst package
func RegisterStats() {
stats.NewGaugeFunc("ErrantGtidTabletCount", "Number of tablets with errant GTIDs", func() int64 {
instances, _ := ReadInstancesWithErrantGTIds("", "")
return int64(len(instances))
})
}

// ReadTopologyInstance collects information on the state of a MySQL
// server and writes the result synchronously to the vtorc
// backend.
Expand Down Expand Up @@ -385,12 +382,6 @@ Cleanup:
instance.GtidErrant, err = vitessmysql.Subtract(redactedExecutedGtidSet.String(), redactedPrimaryExecutedGtidSet.String())
}
}
// update the errant gtid map
go func() {
errantGtidMapMu.Lock()
defer errantGtidMapMu.Unlock()
errantGtidMap[topoproto.TabletAliasString(tablet.Alias)] = instance.GtidErrant
}()
}

latency.Stop("instance")
Expand Down Expand Up @@ -682,6 +673,18 @@ func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error)
return readInstancesByCondition(condition, args, "")
}

// ReadInstancesWithErrantGTIds reads all instances with errant GTIDs
func ReadInstancesWithErrantGTIds(keyspace string, shard string) ([]*Instance, error) {
condition := `
keyspace LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END)
and shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END)
and gtid_errant != ''
`

args := sqlutils.Args(keyspace, keyspace, shard, shard)
return readInstancesByCondition(condition, args, "")
}

// GetKeyspaceShardName gets the keyspace shard name for the given instance key
func GetKeyspaceShardName(tabletAlias string) (keyspace string, shard string, err error) {
query := `
Expand Down
84 changes: 84 additions & 0 deletions go/vt/vtorc/inst/instance_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,90 @@ func TestReadProblemInstances(t *testing.T) {
}
}

// TestReadInstancesWithErrantGTIds is used to test the functionality of ReadInstancesWithErrantGTIds and verify its failure modes and successes.
func TestReadInstancesWithErrantGTIds(t *testing.T) {
// The test is intended to be used as follows. The initial data is stored into the database. Following this, some specific queries are run that each individual test specifies to get the desired state.
tests := []struct {
name string
keyspace string
shard string
sql []string
instancesRequired []string
}{
{
name: "No instances with errant GTID",
sql: nil,
instancesRequired: nil,
}, {
name: "errant GTID",
sql: []string{
"update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'",
},
instancesRequired: []string{"zone1-0000000112"},
}, {
name: "keyspace filtering - success",
keyspace: "ks",
sql: []string{
"update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'",
},
instancesRequired: []string{"zone1-0000000112"},
}, {
name: "keyspace filtering - failure",
keyspace: "unknown",
sql: []string{
"update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'",
},
instancesRequired: nil,
}, {
name: "shard filtering - success",
keyspace: "ks",
shard: "0",
sql: []string{
"update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'",
},
instancesRequired: []string{"zone1-0000000112"},
}, {
name: "shard filtering - failure",
keyspace: "ks",
shard: "unknown",
sql: []string{
"update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'",
},
instancesRequired: nil,
},
}

// We need to set InstancePollSeconds to a large value otherwise all the instances are reported as having problems since their last_checked is very old.
// Setting this value to a hundred years, we ensure that this test doesn't fail with this issue for the next hundred years.
oldVal := config.Config.InstancePollSeconds
defer func() {
config.Config.InstancePollSeconds = oldVal
}()
config.Config.InstancePollSeconds = 60 * 60 * 24 * 365 * 100

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Each test should clear the database. The easiest way to do that is to run all the initialization commands again
defer func() {
db.ClearVTOrcDatabase()
}()

for _, query := range append(initialSQL, tt.sql...) {
_, err := db.ExecVTOrc(query)
require.NoError(t, err)
}

instances, err := ReadInstancesWithErrantGTIds(tt.keyspace, tt.shard)
require.NoError(t, err)
var tabletAliases []string
for _, instance := range instances {
tabletAliases = append(tabletAliases, instance.InstanceAlias)
}
require.ElementsMatch(t, tabletAliases, tt.instancesRequired)
})
}
}

// TestReadInstancesByCondition is used to test the functionality of readInstancesByCondition and verify its failure modes and successes.
func TestReadInstancesByCondition(t *testing.T) {
tests := []struct {
Expand Down
24 changes: 23 additions & 1 deletion go/vt/vtorc/server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type vtorcAPI struct{}

const (
problemsAPI = "/api/problems"
errantGTIDsAPI = "/api/errant-gtids"
disableGlobalRecoveriesAPI = "/api/disable-global-recoveries"
enableGlobalRecoveriesAPI = "/api/enable-global-recoveries"
replicationAnalysisAPI = "/api/replication-analysis"
Expand All @@ -55,6 +56,7 @@ var (
apiHandler = &vtorcAPI{}
vtorcAPIPaths = []string{
problemsAPI,
errantGTIDsAPI,
disableGlobalRecoveriesAPI,
enableGlobalRecoveriesAPI,
replicationAnalysisAPI,
Expand All @@ -80,6 +82,8 @@ func (v *vtorcAPI) ServeHTTP(response http.ResponseWriter, request *http.Request
healthAPIHandler(response, request)
case problemsAPI:
problemsAPIHandler(response, request)
case errantGTIDsAPI:
errantGTIDsAPIHandler(response, request)
case replicationAnalysisAPI:
replicationAnalysisAPIHandler(response, request)
case AggregatedDiscoveryMetricsAPI:
Expand All @@ -94,7 +98,7 @@ func (v *vtorcAPI) ServeHTTP(response http.ResponseWriter, request *http.Request
// getACLPermissionLevelForAPI returns the acl permission level that is required to run a given API
func getACLPermissionLevelForAPI(apiEndpoint string) string {
switch apiEndpoint {
case problemsAPI:
case problemsAPI, errantGTIDsAPI:
return acl.MONITORING
case disableGlobalRecoveriesAPI, enableGlobalRecoveriesAPI:
return acl.ADMIN
Expand Down Expand Up @@ -144,6 +148,24 @@ func problemsAPIHandler(response http.ResponseWriter, request *http.Request) {
returnAsJSON(response, http.StatusOK, instances)
}

// errantGTIDsAPIHandler is the handler for the errantGTIDsAPI endpoint
func errantGTIDsAPIHandler(response http.ResponseWriter, request *http.Request) {
// This api also supports filtering by shard and keyspace provided.
shard := request.URL.Query().Get("shard")
keyspace := request.URL.Query().Get("keyspace")
if shard != "" && keyspace == "" {
http.Error(response, shardWithoutKeyspaceFilteringErrorStr, http.StatusBadRequest)
return
}

instances, err := inst.ReadInstancesWithErrantGTIds(keyspace, shard)
if err != nil {
http.Error(response, err.Error(), http.StatusInternalServerError)
return
}
returnAsJSON(response, http.StatusOK, instances)
}

// AggregatedDiscoveryMetricsAPIHandler is the handler for the discovery metrics endpoint
func AggregatedDiscoveryMetricsAPIHandler(response http.ResponseWriter, request *http.Request) {
// return metrics for last x seconds
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtorc/server/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ func TestGetACLPermissionLevelForAPI(t *testing.T) {
{
apiEndpoint: problemsAPI,
want: acl.MONITORING,
}, {
apiEndpoint: errantGTIDsAPI,
want: acl.MONITORING,
}, {
apiEndpoint: disableGlobalRecoveriesAPI,
want: acl.ADMIN,
Expand Down

0 comments on commit 6a61459

Please sign in to comment.