From 19a1176601b595b8acb6f846627d357d30b021c0 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 25 Apr 2023 13:33:23 -0400 Subject: [PATCH 01/10] go/cmd: vtbackup_duration_by_phase => vtbackup_phase Signed-off-by: Max Englander --- go/cmd/vtbackup/vtbackup.go | 45 +++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go index a2f06753f0e..158d9671fb4 100644 --- a/go/cmd/vtbackup/vtbackup.go +++ b/go/cmd/vtbackup/vtbackup.go @@ -98,6 +98,11 @@ const ( // place a hard cap on the overall time for a backup, while also not waiting // forever for things that should be quick. operationTimeout = 1 * time.Minute + + phaseNameCatchUpReplication = "CatchUpReplication" + phaseNameInitialBackup = "InitialBackup" + phaseNameRestoreLastBackup = "RestoreLastBackup" + phaseNameTakeNewBackup = "TakeNewBackup" ) var ( @@ -121,11 +126,17 @@ var ( detachedMode bool keepAliveTimeout = 0 * time.Second disableRedoLog = false - durationByPhase = stats.NewGaugesWithSingleLabel( - "DurationByPhaseSeconds", - "How long it took vtbackup to perform each phase (in seconds).", + phase = stats.NewGaugesWithSingleLabel( + "Phase", + "Active phase.", "phase", ) + phaseNames = []string{ + phaseNameCatchUpReplication, + phaseNameInitialBackup, + phaseNameRestoreLastBackup, + phaseNameTakeNewBackup, + } ) func registerFlags(fs *pflag.FlagSet) { @@ -200,6 +211,11 @@ func main() { topoServer := topo.Open() defer topoServer.Close() + // Initialize stats. + for _, phaseName := range phaseNames { + phase.Set(phaseName, int64(0)) + } + // Try to take a backup, if it's been long enough since the last one. // Skip pruning if backup wasn't fully successful. We don't want to be // deleting things if the backup process is not healthy. @@ -265,11 +281,9 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back } initCtx, initCancel := context.WithTimeout(ctx, mysqlTimeout) defer initCancel() - initMysqldAt := time.Now() if err := mysqld.Init(initCtx, mycnf, initDBSQLFile); err != nil { return fmt.Errorf("failed to initialize mysql data dir and start mysqld: %v", err) } - durationByPhase.Set("InitMySQLd", int64(time.Since(initMysqldAt).Seconds())) // Shut down mysqld when we're done. defer func() { // Be careful not to use the original context, because we don't want to @@ -330,19 +344,21 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back return err } - backupParams.BackupTime = time.Now() // Now we're ready to take the backup. + phase.Set(phaseNameInitialBackup, int64(1)) + defer phase.Set(phaseNameInitialBackup, int64(0)) if err := mysqlctl.Backup(ctx, backupParams); err != nil { return fmt.Errorf("backup failed: %v", err) } - durationByPhase.Set("InitialBackup", int64(time.Since(backupParams.BackupTime).Seconds())) log.Info("Initial backup successful.") + phase.Set(phaseNameInitialBackup, int64(0)) return nil } + phase.Set(phaseNameRestoreLastBackup, int64(1)) + defer phase.Set(phaseNameRestoreLastBackup, int64(0)) backupDir := mysqlctl.GetBackupDir(initKeyspace, initShard) log.Infof("Restoring latest backup from directory %v", backupDir) - restoreAt := time.Now() params := mysqlctl.RestoreParams{ Cnf: mycnf, Mysqld: mysqld, @@ -371,7 +387,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back default: return fmt.Errorf("can't restore from backup: %v", err) } - durationByPhase.Set("RestoreLastBackup", int64(time.Since(restoreAt).Seconds())) + phase.Set(phaseNameRestoreLastBackup, int64(0)) // As of MySQL 8.0.21, you can disable redo logging using the ALTER INSTANCE // DISABLE INNODB REDO_LOG statement. This functionality is intended for @@ -429,6 +445,8 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back backupParams.BackupTime = time.Now() // Wait for replication to catch up. + phase.Set(phaseNameCatchUpReplication, int64(1)) + defer phase.Set(phaseNameCatchUpReplication, int64(0)) waitStartTime := time.Now() for { select { @@ -446,7 +464,6 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back // We're caught up on replication to at least the point the primary // was at when this vtbackup run started. log.Infof("Replication caught up to %v after %v", status.Position, time.Since(waitStartTime)) - durationByPhase.Set("CatchUpReplication", int64(time.Since(waitStartTime).Seconds())) break } if !status.Healthy() { @@ -456,6 +473,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back } } } + phase.Set(phaseNameCatchUpReplication, int64(0)) // Stop replication and see where we are. if err := mysqld.StopReplication(nil); err != nil { @@ -480,7 +498,6 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back } if restartBeforeBackup { - restartAt := time.Now() log.Info("Proceeding with clean MySQL shutdown and startup to flush all buffers.") // Prep for full/clean shutdown (not typically the default) if err := mysqld.ExecuteSuperQuery(ctx, "SET GLOBAL innodb_fast_shutdown=0"); err != nil { @@ -494,15 +511,15 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back if err := mysqld.Start(ctx, mycnf); err != nil { return fmt.Errorf("Could not start MySQL after full shutdown: %v", err) } - durationByPhase.Set("RestartBeforeBackup", int64(time.Since(restartAt).Seconds())) } // Now we can take a new backup. - backupAt := time.Now() + phase.Set(phaseNameTakeNewBackup, int64(1)) + defer phase.Set(phaseNameTakeNewBackup, int64(0)) if err := mysqlctl.Backup(ctx, backupParams); err != nil { return fmt.Errorf("error taking backup: %v", err) } - durationByPhase.Set("TakeNewBackup", int64(time.Since(backupAt).Seconds())) + phase.Set(phaseNameTakeNewBackup, int64(0)) // Return a non-zero exit code if we didn't meet the replication position // goal, even though we took a backup that pushes the high-water mark up. From a2bbaa355fbba19b2e47fc456e64043d2ddd6811 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 25 Apr 2023 13:46:50 -0400 Subject: [PATCH 02/10] update release notes Signed-off-by: Max Englander --- changelog/17.0/17.0.0/summary.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/changelog/17.0/17.0.0/summary.md b/changelog/17.0/17.0.0/summary.md index fdb1763a412..aa58ae6b89f 100644 --- a/changelog/17.0/17.0.0/summary.md +++ b/changelog/17.0/17.0.0/summary.md @@ -210,11 +210,11 @@ These operations are counted and timed, and the number of bytes consumed or prod Vtbackup exports some metrics which are not available elsewhere. -**DurationByPhaseSeconds** +**Phase** Vtbackup fetches the last backup, restores it to an empty mysql installation, replicates recent changes into that installation, and then takes a backup of that installation. -_DurationByPhaseSeconds_ exports timings for these individual phases. +_Phase_ a binary-valued gauge that reports the currently active phase. ##### Example @@ -250,11 +250,11 @@ _DurationByPhaseSeconds_ exports timings for these individual phases. "BackupEngine.Builtin.Destination:Close": 17144630, "BackupStorage.File.File:Write": 10743169 }, - "DurationByPhaseSeconds": { - "InitMySQLd": 2, - "RestoreLastBackup": 6, - "CatchUpReplication": 1, - "TakeNewBackup": 4 + "Phase": { + "InitMySQLd": 0, + "RestoreLastBackup": 0, + "CatchUpReplication": 0, + "TakeNewBackup": 0 }, "RestoreBytes": { "BackupEngine.Builtin.Source:Read": 1095, @@ -290,8 +290,8 @@ _DurationByPhaseSeconds_ exports timings for these individual phases. Some notes to help understand these metrics: * `BackupBytes["BackupStorage.File.File:Write"]` measures how many bytes were read from disk by the `file` Backup Storage implementation during the backup phase. -* `DurationByPhaseSeconds["CatchUpReplication"]` measures how long it took to catch-up replication after the restore phase. -* `DurationByPhaseSeconds["RestoreLastBackup"]` measures to the duration of the restore phase. +* `Phase["CatchUpReplication"]` reports whether the catch-up replication phase is active (1) or not (0). +* `Phase["RestoreLastBackup"]` reports whether the restore last backup phase is active (1) or not (0). * `RestoreDurationNanoseconds["-.-.Restore"]` also measures to the duration of the restore phase. #### VTTablet error count with error code From 3d076ff09c0dc20f6ff08f2563befe4550e28da4 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Sun, 28 May 2023 16:17:29 -0400 Subject: [PATCH 03/10] send stats to file during vtbackup test, verify afterwards Signed-off-by: Max Englander --- go/cmd/vtbackup/plugin_opentsdb.go | 25 +++ go/cmd/vtbackup/vtbackup.go | 12 +- go/stats/counters.go | 23 ++ go/stats/export.go | 28 ++- go/stats/opentsdb/backend.go | 57 +++++ go/stats/opentsdb/by_metric.go | 54 +++++ .../opentsdb/{opentsdb.go => collector.go} | 212 ++++-------------- go/stats/opentsdb/datapoint.go | 90 ++++++++ go/stats/opentsdb/datapoint_reader.go | 53 +++++ go/stats/opentsdb/doc.go | 18 ++ go/stats/opentsdb/file_writer.go | 52 +++++ go/stats/opentsdb/flags.go | 38 ++++ go/stats/opentsdb/http_writer.go | 51 +++++ go/stats/opentsdb/init.go | 104 +++++++++ go/stats/opentsdb/opentsdb_test.go | 13 +- go/stats/opentsdb/writer.go | 21 ++ go/stats/statsd/statsd.go | 14 +- .../backup/vtbackup/backup_only_test.go | 103 ++++++++- go/test/endtoend/cluster/vtbackup_process.go | 3 +- 19 files changed, 776 insertions(+), 195 deletions(-) create mode 100644 go/cmd/vtbackup/plugin_opentsdb.go create mode 100644 go/stats/opentsdb/backend.go create mode 100644 go/stats/opentsdb/by_metric.go rename go/stats/opentsdb/{opentsdb.go => collector.go} (54%) create mode 100644 go/stats/opentsdb/datapoint.go create mode 100644 go/stats/opentsdb/datapoint_reader.go create mode 100644 go/stats/opentsdb/doc.go create mode 100644 go/stats/opentsdb/file_writer.go create mode 100644 go/stats/opentsdb/flags.go create mode 100644 go/stats/opentsdb/http_writer.go create mode 100644 go/stats/opentsdb/init.go create mode 100644 go/stats/opentsdb/writer.go diff --git a/go/cmd/vtbackup/plugin_opentsdb.go b/go/cmd/vtbackup/plugin_opentsdb.go new file mode 100644 index 00000000000..44ac886d420 --- /dev/null +++ b/go/cmd/vtbackup/plugin_opentsdb.go @@ -0,0 +1,25 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import "vitess.io/vitess/go/stats/opentsdb" + +// This plugin imports opentsdb to register the opentsdb stats backend. + +func init() { + opentsdb.Init("vtbackup") +} diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go index 158d9671fb4..6fe02ac2e80 100644 --- a/go/cmd/vtbackup/vtbackup.go +++ b/go/cmd/vtbackup/vtbackup.go @@ -126,7 +126,17 @@ var ( detachedMode bool keepAliveTimeout = 0 * time.Second disableRedoLog = false - phase = stats.NewGaugesWithSingleLabel( + // This gauge is updated 3*N times during the course of a vtbackup run, + // where N is the number of different phases vtbackup transitions through. + // Once to initialize to 0, another time to set the phase to active (1), + // and another to deactivate the phase (back to 0). + // + // At most a single phase is active at a given time. + // + // The sync gauge immediately reports changes to push-backed backends. + // The benefit of the sync gauge is that it makes verifying stats in + // integration tests a lot more tractable. + phase = stats.NewSyncGaugesWithSingleLabel( "Phase", "Active phase.", "phase", diff --git a/go/stats/counters.go b/go/stats/counters.go index f144c0ce3dd..ef5cbc7ffae 100644 --- a/go/stats/counters.go +++ b/go/stats/counters.go @@ -323,6 +323,29 @@ func (g *GaugesWithSingleLabel) Set(name string, value int64) { g.counters.set(name, value) } +// SyncGaugesWithSingleLabel is a GaugesWithSingleLabel that proactively pushes +// stats to push-based backends when Set is called. +type SyncGaugesWithSingleLabel struct { + GaugesWithSingleLabel + name string +} + +// NewSyncGaugesWithSingleLabel creates a new SyncGaugesWithSingleLabel. +func NewSyncGaugesWithSingleLabel(name, help, label string, tags ...string) *SyncGaugesWithSingleLabel { + return &SyncGaugesWithSingleLabel{ + GaugesWithSingleLabel: *NewGaugesWithSingleLabel(name, help, label, tags...), + name: name, + } +} + +// Set sets the value of a named gauge. +func (sg *SyncGaugesWithSingleLabel) Set(name string, value int64) { + sg.GaugesWithSingleLabel.Set(name, value) + if sg.name != "" { + _ = pushOne(sg.name, &sg.GaugesWithSingleLabel) + } +} + // GaugesWithMultiLabels is a CountersWithMultiLabels implementation where // the values can go up and down. type GaugesWithMultiLabels struct { diff --git a/go/stats/export.go b/go/stats/export.go index e98ef0a969c..bdca24100ba 100644 --- a/go/stats/export.go +++ b/go/stats/export.go @@ -121,6 +121,22 @@ func Publish(name string, v expvar.Var) { publish(name, v) } +func pushAll() error { + backend, ok := pushBackends[statsBackend] + if !ok { + return fmt.Errorf("no PushBackend registered with name %s", statsBackend) + } + return backend.PushAll() +} + +func pushOne(name string, v expvar.Var) error { + backend, ok := pushBackends[statsBackend] + if !ok { + return fmt.Errorf("no PushBackend registered with name %s", statsBackend) + } + return backend.PushOne(name, v) +} + // StringMapFuncWithMultiLabels is a multidimensional string map publisher. // // Map keys are compound names made with joining multiple strings with '.', @@ -183,8 +199,10 @@ func publish(name string, v expvar.Var) { // to be pushed to it. It's used to support push-based metrics backends, as expvar // by default only supports pull-based ones. type PushBackend interface { - // PushAll pushes all stats from expvar to the backend + // PushAll pushes all stats from expvar to the backend. PushAll() error + // PushOne pushes a single stat from expvar to the backend. + PushOne(name string, v expvar.Var) error } var pushBackends = make(map[string]PushBackend) @@ -214,13 +232,7 @@ func emitToBackend(emitPeriod *time.Duration) { ticker := time.NewTicker(*emitPeriod) defer ticker.Stop() for range ticker.C { - backend, ok := pushBackends[statsBackend] - if !ok { - log.Errorf("No PushBackend registered with name %s", statsBackend) - return - } - err := backend.PushAll() - if err != nil { + if err := pushAll(); err != nil { // TODO(aaijazi): This might cause log spam... log.Warningf("Pushing stats to backend %v failed: %v", statsBackend, err) } diff --git a/go/stats/opentsdb/backend.go b/go/stats/opentsdb/backend.go new file mode 100644 index 00000000000..04de028e33b --- /dev/null +++ b/go/stats/opentsdb/backend.go @@ -0,0 +1,57 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "expvar" + "time" +) + +// backend implements stats.PushBackend +type backend struct { + // The prefix is the name of the binary (vtgate, vttablet, etc.) and will be + // prepended to all the stats reported. + prefix string + // Tags that should be included with every data point. If there's a tag name + // collision between the common tags and a single data point's tags, the data + // point tag will override the common tag. + commonTags map[string]string + // writer is used to send data points somewhere (file, http, ...). + writer writer +} + +// PushAll pushes all stats to OpenTSDB +func (b *backend) PushAll() error { + collector := b.collector() + collector.collectAll() + return b.writer.Write(collector.data) +} + +// PushOne pushes a single stat to OpenTSDB +func (b *backend) PushOne(name string, v expvar.Var) error { + collector := b.collector() + collector.collectOne(name, v) + return b.writer.Write(collector.data) +} + +func (b *backend) collector() *collector { + return &collector{ + commonTags: b.commonTags, + prefix: b.prefix, + timestamp: time.Now().Unix(), + } +} diff --git a/go/stats/opentsdb/by_metric.go b/go/stats/opentsdb/by_metric.go new file mode 100644 index 00000000000..5177109a18e --- /dev/null +++ b/go/stats/opentsdb/by_metric.go @@ -0,0 +1,54 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +// byMetric implements sort.Interface for []*DataPoint based on the metric key +// and then tag values (prioritized in tag name order). Having a consistent sort order +// is convenient when refreshing /debug/opentsdb or for encoding and comparing JSON directly +// in the tests. +type byMetric []*DataPoint + +func (m byMetric) Len() int { return len(m) } +func (m byMetric) Swap(i, j int) { m[i], m[j] = m[j], m[i] } +func (m byMetric) Less(i, j int) bool { + if m[i].Metric < m[j].Metric { + return true + } + + if m[i].Metric > m[j].Metric { + return false + } + + // Metric names are the same. We can use tag values to figure out the sort order. + // The deciding tag will be the lexicographically earliest tag name where tag values differ. + decidingTagName := "" + result := false + for tagName, iVal := range m[i].Tags { + jVal, ok := m[j].Tags[tagName] + if !ok { + // We'll arbitrarily declare that if i has any tag name that j doesn't then it sorts earlier. + // This shouldn't happen in practice, though, if metric code is correct... + return true + } + + if iVal != jVal && (tagName < decidingTagName || decidingTagName == "") { + decidingTagName = tagName + result = iVal < jVal + } + } + return result +} diff --git a/go/stats/opentsdb/opentsdb.go b/go/stats/opentsdb/collector.go similarity index 54% rename from go/stats/opentsdb/opentsdb.go rename to go/stats/opentsdb/collector.go index 3e85052b5f4..9b870815067 100644 --- a/go/stats/opentsdb/opentsdb.go +++ b/go/stats/opentsdb/collector.go @@ -14,151 +14,47 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package opentsdb adds support for pushing stats to opentsdb. package opentsdb import ( "bytes" "encoding/json" "expvar" - "net/http" - "sort" "strings" - "time" "unicode" - "github.com/spf13/pflag" - "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/servenv" ) -var openTsdbURI string - -func registerFlags(fs *pflag.FlagSet) { - fs.StringVar(&openTsdbURI, "opentsdb_uri", openTsdbURI, "URI of opentsdb /api/put method") -} - -func init() { - servenv.OnParseFor("vtctld", registerFlags) - servenv.OnParseFor("vtgate", registerFlags) - servenv.OnParseFor("vttablet", registerFlags) -} - -// dataPoint represents a single OpenTSDB data point. -type dataPoint struct { - // Example: sys.cpu.nice - Metric string `json:"metric"` - // Seconds or milliseconds since unix epoch. - Timestamp float64 `json:"timestamp"` - Value float64 `json:"value"` - Tags map[string]string `json:"tags"` -} - -// sendDataPoints pushes a list of data points to openTSDB. -// All other code in this file is just to support getting this function called -// with all stats represented as data points. -func sendDataPoints(data []dataPoint) error { - json, err := json.Marshal(data) - if err != nil { - return err - } - - resp, err := http.Post(openTsdbURI, "application/json", bytes.NewReader(json)) - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -// openTSDBBackend implements stats.PushBackend -type openTSDBBackend struct { - // The prefix is the name of the binary (vtgate, vttablet, etc.) and will be - // prepended to all the stats reported. - prefix string - // Tags that should be included with every data point. If there's a tag name - // collision between the common tags and a single data point's tags, the data - // point tag will override the common tag. +// collector tracks state for a single pass of stats reporting / data collection. +type collector struct { commonTags map[string]string -} - -// dataCollector tracks state for a single pass of stats reporting / data collection. -type dataCollector struct { - settings *openTSDBBackend + data []*DataPoint + prefix string timestamp int64 - dataPoints []dataPoint -} - -// Init attempts to create a singleton openTSDBBackend and register it as a PushBackend. -// If it fails to create one, this is a noop. The prefix argument is an optional string -// to prepend to the name of every data point reported. -func Init(prefix string) { - // Needs to happen in servenv.OnRun() instead of init because it requires flag parsing and logging - servenv.OnRun(func() { - InitWithoutServenv(prefix) - }) -} - -// InitWithoutServenv initializes the opentsdb without servenv -func InitWithoutServenv(prefix string) { - if openTsdbURI == "" { - return - } - - backend := &openTSDBBackend{ - prefix: prefix, - commonTags: stats.ParseCommonTags(stats.CommonTags), - } - - stats.RegisterPushBackend("opentsdb", backend) - - servenv.HTTPHandleFunc("/debug/opentsdb", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json; charset=utf-8") - dataPoints := (*backend).getDataPoints() - sort.Sort(byMetric(dataPoints)) - - if b, err := json.MarshalIndent(dataPoints, "", " "); err != nil { - w.Write([]byte(err.Error())) - } else { - w.Write(b) - } - }) } -// PushAll pushes all stats to OpenTSDB -func (backend *openTSDBBackend) PushAll() error { - return sendDataPoints(backend.getDataPoints()) -} - -// getDataPoints fetches all stats in an opentsdb-compatible format. -// This is separated from PushAll() so it can be reused for the /debug/opentsdb handler. -func (backend *openTSDBBackend) getDataPoints() []dataPoint { - dataCollector := &dataCollector{ - settings: backend, - timestamp: time.Now().Unix(), - } - +func (dc *collector) collectAll() { expvar.Do(func(kv expvar.KeyValue) { - dataCollector.addExpVar(kv) + dc.addExpVar(kv) }) - - return dataCollector.dataPoints } -// combineMetricName joins parts of a hierarchical name with a "." -func combineMetricName(parts ...string) string { - return strings.Join(parts, ".") +func (dc *collector) collectOne(name string, v expvar.Var) { + dc.addExpVar(expvar.KeyValue{ + Key: name, + Value: v, + }) } -func (dc *dataCollector) addInt(metric string, val int64, tags map[string]string) { +func (dc *collector) addInt(metric string, val int64, tags map[string]string) { dc.addFloat(metric, float64(val), tags) } -func (dc *dataCollector) addFloat(metric string, val float64, tags map[string]string) { +func (dc *collector) addFloat(metric string, val float64, tags map[string]string) { var fullMetric string - if len(dc.settings.prefix) > 0 { - fullMetric = combineMetricName(dc.settings.prefix, metric) + if len(dc.prefix) > 0 { + fullMetric = combineMetricName(dc.prefix, metric) } else { fullMetric = metric } @@ -182,20 +78,20 @@ func (dc *dataCollector) addFloat(metric string, val float64, tags map[string]st } fullTags := make(map[string]string) - for k, v := range dc.settings.commonTags { + for k, v := range dc.commonTags { fullTags[sanitize(k)] = sanitize(v) } for k, v := range tags { fullTags[sanitize(k)] = sanitize(v) } - dp := dataPoint{ + dp := &DataPoint{ Metric: sanitize(fullMetric), Value: val, Timestamp: float64(dc.timestamp), Tags: fullTags, } - dc.dataPoints = append(dc.dataPoints, dp) + dc.data = append(dc.data, dp) } // addExpVar adds all the data points associated with a particular expvar to the list of @@ -206,7 +102,7 @@ func (dc *dataCollector) addFloat(metric string, val float64, tags map[string]st // // Generic unrecognized expvars are serialized to json and their int/float values are exported. // Strings and lists in expvars are not exported. -func (dc *dataCollector) addExpVar(kv expvar.KeyValue) { +func (dc *collector) addExpVar(kv expvar.KeyValue) { k := kv.Key switch v := kv.Value.(type) { case stats.FloatFunc: @@ -268,24 +164,8 @@ func (dc *dataCollector) addExpVar(kv expvar.KeyValue) { } } -// makeLabel builds a tag list with a single label + value. -func makeLabel(labelName string, labelVal string) map[string]string { - return map[string]string{labelName: labelVal} -} - -// makeLabels takes the vitess stat representation of label values ("."-separated list) and breaks it -// apart into a map of label name -> label value. -func makeLabels(labelNames []string, labelValsCombined string) map[string]string { - tags := make(map[string]string) - labelVals := strings.Split(labelValsCombined, ".") - for i, v := range labelVals { - tags[labelNames[i]] = v - } - return tags -} - // addUnrecognizedExpvars recurses into a json object to pull out float64 variables to report. -func (dc *dataCollector) addUnrecognizedExpvars(prefix string, obj map[string]any) { +func (dc *collector) addUnrecognizedExpvars(prefix string, obj map[string]any) { for k, v := range obj { prefix := combineMetricName(prefix, k) switch v := v.(type) { @@ -298,7 +178,7 @@ func (dc *dataCollector) addUnrecognizedExpvars(prefix string, obj map[string]an } // addTimings converts a vitess Timings stat to something opentsdb can deal with. -func (dc *dataCollector) addTimings(labels []string, timings *stats.Timings, prefix string) { +func (dc *collector) addTimings(labels []string, timings *stats.Timings, prefix string) { histograms := timings.Histograms() for labelValsCombined, histogram := range histograms { // If you prefer millisecond timings over nanoseconds you can pass 1000000 here instead of 1. @@ -306,7 +186,7 @@ func (dc *dataCollector) addTimings(labels []string, timings *stats.Timings, pre } } -func (dc *dataCollector) addHistogram(histogram *stats.Histogram, divideBy int64, prefix string, tags map[string]string) { +func (dc *collector) addHistogram(histogram *stats.Histogram, divideBy int64, prefix string, tags map[string]string) { // TODO: OpenTSDB 2.3 doesn't have histogram support, although it's forthcoming. // For simplicity we report each bucket as a different metric. // @@ -335,39 +215,23 @@ func (dc *dataCollector) addHistogram(histogram *stats.Histogram, divideBy int64 ) } -// byMetric implements sort.Interface for []dataPoint based on the metric key -// and then tag values (prioritized in tag name order). Having a consistent sort order -// is convenient when refreshing /debug/opentsdb or for encoding and comparing JSON directly -// in the tests. -type byMetric []dataPoint - -func (m byMetric) Len() int { return len(m) } -func (m byMetric) Swap(i, j int) { m[i], m[j] = m[j], m[i] } -func (m byMetric) Less(i, j int) bool { - if m[i].Metric < m[j].Metric { - return true - } - - if m[i].Metric > m[j].Metric { - return false - } +// combineMetricName joins parts of a hierarchical name with a "." +func combineMetricName(parts ...string) string { + return strings.Join(parts, ".") +} - // Metric names are the same. We can use tag values to figure out the sort order. - // The deciding tag will be the lexicographically earliest tag name where tag values differ. - decidingTagName := "" - result := false - for tagName, iVal := range m[i].Tags { - jVal, ok := m[j].Tags[tagName] - if !ok { - // We'll arbitrarily declare that if i has any tag name that j doesn't then it sorts earlier. - // This shouldn't happen in practice, though, if metric code is correct... - return true - } +// makeLabel builds a tag list with a single label + value. +func makeLabel(labelName string, labelVal string) map[string]string { + return map[string]string{labelName: labelVal} +} - if iVal != jVal && (tagName < decidingTagName || decidingTagName == "") { - decidingTagName = tagName - result = iVal < jVal - } +// makeLabels takes the vitess stat representation of label values ("."-separated list) and breaks it +// apart into a map of label name -> label value. +func makeLabels(labelNames []string, labelValsCombined string) map[string]string { + tags := make(map[string]string) + labelVals := strings.Split(labelValsCombined, ".") + for i, v := range labelVals { + tags[labelNames[i]] = v } - return result + return tags } diff --git a/go/stats/opentsdb/datapoint.go b/go/stats/opentsdb/datapoint.go new file mode 100644 index 00000000000..42e69b84d47 --- /dev/null +++ b/go/stats/opentsdb/datapoint.go @@ -0,0 +1,90 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "fmt" + "strconv" + "strings" +) + +// DataPoint represents a single OpenTSDB data point. +type DataPoint struct { + // Example: sys.cpu.nice + Metric string `json:"metric"` + // Seconds or milliseconds since unix epoch. + Timestamp float64 `json:"timestamp"` + Value float64 `json:"value"` + Tags map[string]string `json:"tags"` +} + +func (dp *DataPoint) MarshalText() (string, error) { + var sb strings.Builder + + if _, err := sb.WriteString(fmt.Sprintf("%s %f %f", dp.Metric, dp.Timestamp, dp.Value)); err != nil { + return "", err + } + + for k, v := range dp.Tags { + if _, err := sb.WriteString(fmt.Sprintf(" %s=%s", k, v)); err != nil { + return "", err + } + } + + if _, err := sb.WriteString("\n"); err != nil { + return "", err + } + + return sb.String(), nil +} + +func unmarshalTextToData(dp *DataPoint, text []byte) error { + parts := strings.Split(string(text), " ") + + if len(parts) < 3 { + // Technically every OpenTSDB time series requires at least one tag, + // but some of the metrics we send have zero. + return fmt.Errorf("require format: [ ]") + } + + dp.Metric = parts[0] + + timestamp, err := strconv.ParseFloat(parts[1], 64) + if err != nil { + return err + } + dp.Timestamp = timestamp + + value, err := strconv.ParseFloat(parts[2], 64) + if err != nil { + return err + } + dp.Value = value + + for _, kv := range parts[3:] { + tagParts := strings.Split(kv, "=") + if len(tagParts) != 2 { + return fmt.Errorf("require tag format: ") + } + if dp.Tags == nil { + dp.Tags = make(map[string]string) + } + dp.Tags[tagParts[0]] = tagParts[1] + } + + return nil +} diff --git a/go/stats/opentsdb/datapoint_reader.go b/go/stats/opentsdb/datapoint_reader.go new file mode 100644 index 00000000000..441be9eb7a1 --- /dev/null +++ b/go/stats/opentsdb/datapoint_reader.go @@ -0,0 +1,53 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "bufio" + "io" +) + +var newLineDelimiter = byte('\n') + +// DataPointReader parses bytes from io.Reader into DataPoints. +type DataPointReader struct { + reader *bufio.Reader +} + +func NewDataPointReader(r io.Reader) *DataPointReader { + return &DataPointReader{ + reader: bufio.NewReader(r), + } +} + +// Read returns a DataPoint from the underlying io.Reader. +// +// Returns an error if no DataPoint could be parsed. +func (tr *DataPointReader) Read() (*DataPoint, error) { + bs, err := tr.reader.ReadBytes(newLineDelimiter) + if err != nil { + return nil, err + } + + dp := &DataPoint{} + + if err := unmarshalTextToData(dp, bs[:len(bs)-1]); err != nil { + return nil, err + } + + return dp, nil +} diff --git a/go/stats/opentsdb/doc.go b/go/stats/opentsdb/doc.go new file mode 100644 index 00000000000..88c22a58c70 --- /dev/null +++ b/go/stats/opentsdb/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package opentsdb adds support for pushing stats to opentsdb. +package opentsdb diff --git a/go/stats/opentsdb/file_writer.go b/go/stats/opentsdb/file_writer.go new file mode 100644 index 00000000000..7f2d2f2ccc7 --- /dev/null +++ b/go/stats/opentsdb/file_writer.go @@ -0,0 +1,52 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "io" + "os" +) + +type fileWriter struct { + writer io.WriteCloser +} + +func newFileWriter(path string) (writer, error) { + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_SYNC|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + + return &fileWriter{ + writer: f, + }, nil +} + +func (fw *fileWriter) Write(data []*DataPoint) error { + for _, d := range data { + text, err := d.MarshalText() + if err != nil { + return err + } + + if _, err := fw.writer.Write([]byte(text)); err != nil { + return err + } + } + + return nil +} diff --git a/go/stats/opentsdb/flags.go b/go/stats/opentsdb/flags.go new file mode 100644 index 00000000000..8ccd0279981 --- /dev/null +++ b/go/stats/opentsdb/flags.go @@ -0,0 +1,38 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "github.com/spf13/pflag" + + "vitess.io/vitess/go/vt/servenv" +) + +var ( + openTSDBURI string +) + +func registerFlags(fs *pflag.FlagSet) { + fs.StringVar(&openTSDBURI, "opentsdb_uri", openTSDBURI, "URI of opentsdb /api/put method") +} + +func init() { + servenv.OnParseFor("vtbackup", registerFlags) + servenv.OnParseFor("vtctld", registerFlags) + servenv.OnParseFor("vtgate", registerFlags) + servenv.OnParseFor("vttablet", registerFlags) +} diff --git a/go/stats/opentsdb/http_writer.go b/go/stats/opentsdb/http_writer.go new file mode 100644 index 00000000000..7b7801d7f77 --- /dev/null +++ b/go/stats/opentsdb/http_writer.go @@ -0,0 +1,51 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "bytes" + "encoding/json" + "net/http" +) + +type httpWriter struct { + client *http.Client + uri string +} + +func newHTTPWriter(client *http.Client, uri string) *httpWriter { + return &httpWriter{ + client: client, + uri: uri, + } +} + +func (hw *httpWriter) Write(data []*DataPoint) error { + jsonb, err := json.Marshal(data) + if err != nil { + return err + } + + resp, err := hw.client.Post(hw.uri, "application/json", bytes.NewReader(jsonb)) + if err != nil { + return err + } + + resp.Body.Close() + + return nil +} diff --git a/go/stats/opentsdb/init.go b/go/stats/opentsdb/init.go new file mode 100644 index 00000000000..51186ad7650 --- /dev/null +++ b/go/stats/opentsdb/init.go @@ -0,0 +1,104 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "sort" + + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/servenv" +) + +var singletonBackend stats.PushBackend + +// Init attempts to create a singleton *opentsdb.backend and register it as a PushBackend. +// If it fails to create one, this is a noop. The prefix argument is an optional string +// to prepend to the name of every data point reported. +func Init(prefix string) { + // Needs to happen in servenv.OnRun() instead of init because it requires flag parsing and logging + servenv.OnRun(func() { + log.Info("Initializing opentsdb backend...") + backend, err := InitWithoutServenv(prefix) + if err != nil { + log.Infof("Failed to initialize singleton opentsdb backend: %v", err) + } else { + singletonBackend = backend + log.Info("Initialized opentsdb backend.") + } + }) +} + +// InitWithoutServenv initializes the opentsdb without servenv +func InitWithoutServenv(prefix string) (stats.PushBackend, error) { + b, err := newBackend(prefix) + + if err != nil { + return nil, err + } + + stats.RegisterPushBackend("opentsdb", b) + + servenv.HTTPHandleFunc("/debug/opentsdb", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + collector := b.collector() + collector.collectAll() + data := collector.data + sort.Sort(byMetric(data)) + + if b, err := json.MarshalIndent(data, "", " "); err != nil { + w.Write([]byte(err.Error())) + } else { + w.Write(b) + } + }) + + return b, nil +} + +func newBackend(prefix string) (*backend, error) { + if openTSDBURI == "" { + return nil, fmt.Errorf("cannot create opentsdb PushBackend with empty --opentsdb_uri") + } + + var w writer + + // Use the file API when the uri is in format file://... + u, err := url.Parse(openTSDBURI) + if err != nil { + return nil, fmt.Errorf("failed to parse --opentsdb_uri %s: %v", openTSDBURI, err) + } else if u.Scheme == "file" { + fw, err := newFileWriter(u.Path) + if err != nil { + return nil, fmt.Errorf("failed to create file-based writer for --opentsdb_uri %s: %v", openTSDBURI, err) + } else { + w = fw + } + } else { + w = newHTTPWriter(&http.Client{}, openTSDBURI) + } + + return &backend{ + prefix: prefix, + commonTags: stats.ParseCommonTags(stats.CommonTags), + writer: w, + }, nil +} diff --git a/go/stats/opentsdb/opentsdb_test.go b/go/stats/opentsdb/opentsdb_test.go index 0e8ff240500..940ee845ada 100644 --- a/go/stats/opentsdb/opentsdb_test.go +++ b/go/stats/opentsdb/opentsdb_test.go @@ -352,15 +352,16 @@ func TestOpenTsdbTimings(t *testing.T) { } func checkOutput(t *testing.T, statName string, wantJSON string) { - backend := &openTSDBBackend{ + b := &backend{ prefix: "vtgate", commonTags: map[string]string{"host": "localhost"}, } timestamp := int64(1234) - dc := &dataCollector{ - settings: backend, - timestamp: timestamp, + dc := &collector{ + commonTags: b.commonTags, + prefix: b.prefix, + timestamp: timestamp, } found := false expvar.Do(func(kv expvar.KeyValue) { @@ -368,9 +369,9 @@ func checkOutput(t *testing.T, statName string, wantJSON string) { found = true dc.addExpVar(kv) - sort.Sort(byMetric(dc.dataPoints)) + sort.Sort(byMetric(dc.data)) - gotBytes, err := json.MarshalIndent(dc.dataPoints, "", " ") + gotBytes, err := json.MarshalIndent(dc.data, "", " ") if err != nil { t.Errorf("Failed to marshal json: %v", err) return diff --git a/go/stats/opentsdb/writer.go b/go/stats/opentsdb/writer.go new file mode 100644 index 00000000000..49d221cc782 --- /dev/null +++ b/go/stats/opentsdb/writer.go @@ -0,0 +1,21 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package opentsdb + +type writer interface { + Write([]*DataPoint) error +} diff --git a/go/stats/statsd/statsd.go b/go/stats/statsd/statsd.go index 269b185ff7c..cfc461de473 100644 --- a/go/stats/statsd/statsd.go +++ b/go/stats/statsd/statsd.go @@ -219,7 +219,7 @@ func (sb StatsBackend) addExpVar(kv expvar.KeyValue) { } } -// PushAll flush out the pending metrics +// PushAll flushes out the pending metrics func (sb StatsBackend) PushAll() error { expvar.Do(func(kv expvar.KeyValue) { sb.addExpVar(kv) @@ -229,3 +229,15 @@ func (sb StatsBackend) PushAll() error { } return nil } + +// PushOne pushes the single provided metric. +func (sb StatsBackend) PushOne(name string, v expvar.Var) error { + sb.addExpVar(expvar.KeyValue{ + Key: name, + Value: v, + }) + if err := sb.statsdClient.Flush(); err != nil { + return err + } + return nil +} diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go index 408cc64a21b..f590cd672c9 100644 --- a/go/test/endtoend/backup/vtbackup/backup_only_test.go +++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go @@ -19,7 +19,9 @@ package vtbackup import ( "context" "encoding/json" + "errors" "fmt" + "io" "os" "path" "strings" @@ -30,6 +32,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/stats/opentsdb" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" @@ -62,6 +65,9 @@ func TestTabletInitialBackup(t *testing.T) { vtBackup(t, true, false, false) verifyBackupCount(t, shardKsName, 1) + dataPointReader := openTSDBDataPointReader(t) + verifyBackupStats(t, dataPointReader, true /* initialBackup */) + // Initialize the tablets initTablets(t, false, false) @@ -88,7 +94,7 @@ func TestTabletInitialBackup(t *testing.T) { restore(t, replica1, "replica", "SERVING") // Run the entire backup test - firstBackupTest(t, "replica") + firstBackupTest(t, "replica", dataPointReader) tearDown(t, true) } @@ -110,12 +116,12 @@ func TestTabletBackupOnly(t *testing.T) { replica1.VttabletProcess.ServingStatus = "NOT_SERVING" initTablets(t, true, true) - firstBackupTest(t, "replica") + firstBackupTest(t, "replica", nil) tearDown(t, false) } -func firstBackupTest(t *testing.T, tabletType string) { +func firstBackupTest(t *testing.T, tabletType string, dataPointReader *opentsdb.DataPointReader) { // Test First Backup flow. // // firstBackupTest will: @@ -150,6 +156,12 @@ func firstBackupTest(t *testing.T, tabletType string) { // check that the backup shows up in the listing verifyBackupCount(t, shardKsName, len(backups)+1) + // check that backup stats are what we expect + if dataPointReader == nil { + dataPointReader = openTSDBDataPointReader(t) + } + verifyBackupStats(t, dataPointReader, false /* initialBackup */) + // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) require.Nil(t, err) @@ -183,6 +195,11 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo "--allow_first_backup", "--db-credentials-file", dbCredentialFile, "--mysql_socket", mysqlSocket.Name(), + + // Use opentsdb for stats. + "--stats_backend", "opentsdb", + // Write stats to file for reading afterwards. + "--opentsdb_uri", fmt.Sprintf("file://%s", openTSDBFilePath(t)), } if restartBeforeBackup { extraArgs = append(extraArgs, "--restart_before_backup") @@ -413,3 +430,83 @@ func waitForReplicationToCatchup(tablets []cluster.Vttablet) bool { } } } + +func openTSDBDataPointReader(t *testing.T) *opentsdb.DataPointReader { + f, err := os.OpenFile(openTSDBFilePath(t), os.O_RDONLY, 0) + require.NoError(t, err) + return opentsdb.NewDataPointReader(f) +} + +func openTSDBFilePath(t *testing.T) string { + return path.Join(localCluster.TmpDirectory, fmt.Sprintf("opentsdb.%s.txt", t.Name())) +} + +func verifyBackupStats(t *testing.T, dataPointReader *opentsdb.DataPointReader, initialBackup bool) { + // During execution, the following phases will become active, in order. + var expectActivePhases []string + if initialBackup { + expectActivePhases = []string{ + "initialbackup", + } + } else { + expectActivePhases = []string{ + "restorelastbackup", + "catchupreplication", + "takenewbackup", + } + } + + // Sequence of phase activity. + activePhases := make([]string, 0) + + // Last seen phase values. + phaseValues := make(map[string]int64) + + // Scan for phase activity until all we're out of stats to scan. + for dataPoint, err := dataPointReader.Read(); !errors.Is(err, io.EOF); dataPoint, err = dataPointReader.Read() { + // We're only interested in "vtbackup.phase" metrics in this test. + if dataPoint.Metric != "vtbackup.phase" { + continue + } + + phase := dataPoint.Tags["phase"] + value := int64(dataPoint.Value) + lastValue, ok := phaseValues[phase] + + // The value should always be 0 or 1. + require.True(t, int64(0) == value || int64(1) == value) + + // The first time the phase is reported, it should be 0. + if !ok { + require.Equal(t, int64(0), value) + } + + // Eventually the phase should go active. The next time it reports, + // it should go inactive. + if lastValue == 1 { + require.Equal(t, int64(0), value) + } + + // Record current value. + phaseValues[phase] = value + + // Add phase to sequence once it goes from active to inactive. + if lastValue == 1 && value == 0 { + activePhases = append(activePhases, phase) + } + + // Verify at most one phase is active. + activeCount := 0 + for _, value := range phaseValues { + if value == int64(0) { + continue + } + + activeCount++ + require.LessOrEqual(t, activeCount, 1) + } + } + + // Verify phase sequences. + require.Equal(t, expectActivePhases, activePhases) +} diff --git a/go/test/endtoend/cluster/vtbackup_process.go b/go/test/endtoend/cluster/vtbackup_process.go index be75026bf0d..13b48dca7c4 100644 --- a/go/test/endtoend/cluster/vtbackup_process.go +++ b/go/test/endtoend/cluster/vtbackup_process.go @@ -69,8 +69,7 @@ func (vtbackup *VtbackupProcess) Setup() (err error) { //Backup Arguments are not optional "--backup_storage_implementation", "file", - "--file_backup_storage_root", - path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"), + "--file_backup_storage_root", path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"), ) if vtbackup.initialBackup { From 8a7e559a80fe6d5155ca7aea81b08bb9255a7b78 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Mon, 29 May 2023 13:30:49 -0400 Subject: [PATCH 04/10] update flags Signed-off-by: Max Englander --- go/flags/endtoend/vtbackup.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt index aa9c98ea666..dd86fb00d3a 100644 --- a/go/flags/endtoend/vtbackup.txt +++ b/go/flags/endtoend/vtbackup.txt @@ -133,6 +133,7 @@ Usage of vtbackup: --mysql_server_version string MySQL server version to advertise. (default "8.0.30-Vitess") --mysql_socket string path to the mysql socket --mysql_timeout duration how long to wait for mysqld startup (default 5m0s) + --opentsdb_uri string URI of opentsdb /api/put method --port int port for the server --pprof strings enable profiling --purge_logs_interval duration how often try to remove old logs (default 1h0m0s) From 8e759c9bdf9d77317682664500cf6883f99a9482 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Mon, 29 May 2023 13:40:31 -0400 Subject: [PATCH 05/10] use stats.Variable iface instead expvar.Var Signed-off-by: Max Englander --- go/stats/export.go | 4 ++-- go/stats/opentsdb/backend.go | 5 +++-- go/stats/statsd/statsd.go | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/go/stats/export.go b/go/stats/export.go index bdca24100ba..8bda85c87b2 100644 --- a/go/stats/export.go +++ b/go/stats/export.go @@ -129,7 +129,7 @@ func pushAll() error { return backend.PushAll() } -func pushOne(name string, v expvar.Var) error { +func pushOne(name string, v Variable) error { backend, ok := pushBackends[statsBackend] if !ok { return fmt.Errorf("no PushBackend registered with name %s", statsBackend) @@ -202,7 +202,7 @@ type PushBackend interface { // PushAll pushes all stats from expvar to the backend. PushAll() error // PushOne pushes a single stat from expvar to the backend. - PushOne(name string, v expvar.Var) error + PushOne(name string, v Variable) error } var pushBackends = make(map[string]PushBackend) diff --git a/go/stats/opentsdb/backend.go b/go/stats/opentsdb/backend.go index 04de028e33b..e5c766ba797 100644 --- a/go/stats/opentsdb/backend.go +++ b/go/stats/opentsdb/backend.go @@ -17,8 +17,9 @@ limitations under the License. package opentsdb import ( - "expvar" "time" + + "vitess.io/vitess/go/stats" ) // backend implements stats.PushBackend @@ -42,7 +43,7 @@ func (b *backend) PushAll() error { } // PushOne pushes a single stat to OpenTSDB -func (b *backend) PushOne(name string, v expvar.Var) error { +func (b *backend) PushOne(name string, v stats.Variable) error { collector := b.collector() collector.collectOne(name, v) return b.writer.Write(collector.data) diff --git a/go/stats/statsd/statsd.go b/go/stats/statsd/statsd.go index cfc461de473..f791d7b742d 100644 --- a/go/stats/statsd/statsd.go +++ b/go/stats/statsd/statsd.go @@ -231,7 +231,7 @@ func (sb StatsBackend) PushAll() error { } // PushOne pushes the single provided metric. -func (sb StatsBackend) PushOne(name string, v expvar.Var) error { +func (sb StatsBackend) PushOne(name string, v stats.Variable) error { sb.addExpVar(expvar.KeyValue{ Key: name, Value: v, From 19e1e32eb7327e4882978d9841e376c9bba9e9a1 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Mon, 29 May 2023 14:54:32 -0400 Subject: [PATCH 06/10] try fix upgrade/downgrade test Signed-off-by: Max Englander --- .../backup/vtbackup/backup_only_test.go | 37 +++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go index f590cd672c9..5e80d5d3cc3 100644 --- a/go/test/endtoend/backup/vtbackup/backup_only_test.go +++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go @@ -62,10 +62,8 @@ func TestTabletInitialBackup(t *testing.T) { waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2}) - vtBackup(t, true, false, false) + dataPointReader := vtBackup(t, true, false, false) verifyBackupCount(t, shardKsName, 1) - - dataPointReader := openTSDBDataPointReader(t) verifyBackupStats(t, dataPointReader, true /* initialBackup */) // Initialize the tablets @@ -94,7 +92,7 @@ func TestTabletInitialBackup(t *testing.T) { restore(t, replica1, "replica", "SERVING") // Run the entire backup test - firstBackupTest(t, "replica", dataPointReader) + firstBackupTest(t, "replica") tearDown(t, true) } @@ -116,12 +114,12 @@ func TestTabletBackupOnly(t *testing.T) { replica1.VttabletProcess.ServingStatus = "NOT_SERVING" initTablets(t, true, true) - firstBackupTest(t, "replica", nil) + firstBackupTest(t, "replica") tearDown(t, false) } -func firstBackupTest(t *testing.T, tabletType string, dataPointReader *opentsdb.DataPointReader) { +func firstBackupTest(t *testing.T, tabletType string) { // Test First Backup flow. // // firstBackupTest will: @@ -150,16 +148,12 @@ func firstBackupTest(t *testing.T, tabletType string, dataPointReader *opentsdb. // backup the replica log.Infof("taking backup %s", time.Now()) - vtBackup(t, false, true, true) + dataPointReader := vtBackup(t, false, true, true) log.Infof("done taking backup %s", time.Now()) // check that the backup shows up in the listing verifyBackupCount(t, shardKsName, len(backups)+1) - // check that backup stats are what we expect - if dataPointReader == nil { - dataPointReader = openTSDBDataPointReader(t) - } verifyBackupStats(t, dataPointReader, false /* initialBackup */) // insert more data on the primary @@ -185,11 +179,14 @@ func firstBackupTest(t *testing.T, tabletType string, dataPointReader *opentsdb. verifyBackupCount(t, shardKsName, 0) } -func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) { +func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader { mysqlSocket, err := os.CreateTemp("", "vtbackup_test_mysql.sock") require.Nil(t, err) defer os.Remove(mysqlSocket.Name()) + // Prepare opentsdb stats file path. + statsPath := path.Join(t.TempDir(), fmt.Sprintf("opentsdb.%s.txt", t.Name())) + // Take the back using vtbackup executable extraArgs := []string{ "--allow_first_backup", @@ -199,7 +196,7 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo // Use opentsdb for stats. "--stats_backend", "opentsdb", // Write stats to file for reading afterwards. - "--opentsdb_uri", fmt.Sprintf("file://%s", openTSDBFilePath(t)), + "--opentsdb_uri", fmt.Sprintf("file://%s", statsPath), } if restartBeforeBackup { extraArgs = append(extraArgs, "--restart_before_backup") @@ -218,6 +215,10 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo log.Infof("starting backup tablet %s", time.Now()) err = localCluster.StartVtbackup(newInitDBFile, initialBackup, keyspaceName, shardName, cell, extraArgs...) require.Nil(t, err) + + f, err := os.OpenFile(statsPath, os.O_RDONLY, 0) + require.NoError(t, err) + return opentsdb.NewDataPointReader(f) } func verifyBackupCount(t *testing.T, shardKsName string, expected int) []string { @@ -431,16 +432,6 @@ func waitForReplicationToCatchup(tablets []cluster.Vttablet) bool { } } -func openTSDBDataPointReader(t *testing.T) *opentsdb.DataPointReader { - f, err := os.OpenFile(openTSDBFilePath(t), os.O_RDONLY, 0) - require.NoError(t, err) - return opentsdb.NewDataPointReader(f) -} - -func openTSDBFilePath(t *testing.T) string { - return path.Join(localCluster.TmpDirectory, fmt.Sprintf("opentsdb.%s.txt", t.Name())) -} - func verifyBackupStats(t *testing.T, dataPointReader *opentsdb.DataPointReader, initialBackup bool) { // During execution, the following phases will become active, in order. var expectActivePhases []string From ad61e103893cd8f3e7a8dd8f3ed1a984343194f4 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 20 Jun 2023 16:43:59 -0400 Subject: [PATCH 07/10] Revert "update release notes" This reverts commit bb37b272c0c5e799bf49decd0c0e414c392fca2d. Signed-off-by: Max Englander --- changelog/17.0/17.0.0/summary.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/changelog/17.0/17.0.0/summary.md b/changelog/17.0/17.0.0/summary.md index aa58ae6b89f..fdb1763a412 100644 --- a/changelog/17.0/17.0.0/summary.md +++ b/changelog/17.0/17.0.0/summary.md @@ -210,11 +210,11 @@ These operations are counted and timed, and the number of bytes consumed or prod Vtbackup exports some metrics which are not available elsewhere. -**Phase** +**DurationByPhaseSeconds** Vtbackup fetches the last backup, restores it to an empty mysql installation, replicates recent changes into that installation, and then takes a backup of that installation. -_Phase_ a binary-valued gauge that reports the currently active phase. +_DurationByPhaseSeconds_ exports timings for these individual phases. ##### Example @@ -250,11 +250,11 @@ _Phase_ a binary-valued gauge that reports the currently active phase. "BackupEngine.Builtin.Destination:Close": 17144630, "BackupStorage.File.File:Write": 10743169 }, - "Phase": { - "InitMySQLd": 0, - "RestoreLastBackup": 0, - "CatchUpReplication": 0, - "TakeNewBackup": 0 + "DurationByPhaseSeconds": { + "InitMySQLd": 2, + "RestoreLastBackup": 6, + "CatchUpReplication": 1, + "TakeNewBackup": 4 }, "RestoreBytes": { "BackupEngine.Builtin.Source:Read": 1095, @@ -290,8 +290,8 @@ _Phase_ a binary-valued gauge that reports the currently active phase. Some notes to help understand these metrics: * `BackupBytes["BackupStorage.File.File:Write"]` measures how many bytes were read from disk by the `file` Backup Storage implementation during the backup phase. -* `Phase["CatchUpReplication"]` reports whether the catch-up replication phase is active (1) or not (0). -* `Phase["RestoreLastBackup"]` reports whether the restore last backup phase is active (1) or not (0). +* `DurationByPhaseSeconds["CatchUpReplication"]` measures how long it took to catch-up replication after the restore phase. +* `DurationByPhaseSeconds["RestoreLastBackup"]` measures to the duration of the restore phase. * `RestoreDurationNanoseconds["-.-.Restore"]` also measures to the duration of the restore phase. #### VTTablet error count with error code From ff22dc3edcd5d063cde05860e32b360e5351ad7a Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 20 Jun 2023 16:44:10 -0400 Subject: [PATCH 08/10] update v18 release notes Signed-off-by: Max Englander --- changelog/18.0/18.0.0/summary.md | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/changelog/18.0/18.0.0/summary.md b/changelog/18.0/18.0.0/summary.md index 2171e02b31c..b797c0a1e2a 100644 --- a/changelog/18.0/18.0.0/summary.md +++ b/changelog/18.0/18.0.0/summary.md @@ -4,8 +4,11 @@ - **[Major Changes](#major-changes)** - **[Breaking Changes](#breaking-changes)** + - [VTBackup stat `DurationByPhase` removed](#remove-vtbackup-stat-duration-by-phase) - **[New command line flags and behavior](#new-flag)** - [VTOrc flag `--allow-emergency-reparent`](#new-flag-toggle-ers) + - **[New stats](#new-stats)** + - [VTBackup stat `Phase`](#vtbackup-stat-phase) - **[VTAdmin](#vtadmin)** - [Updated to node v18.16.0](#update-node) - **[Deprecations and Deletions](#deprecations-and-deletions)** @@ -16,6 +19,10 @@ ### Breaking Changes +#### VTbackup stat `DurationByPhase` removed + +VTBackup stat `DurationByPhase` is removed. Use the binary-valued `Phase` stat instead. + ### New command line flags and behavior #### VTOrc flag `--allow-emergency-reparent` @@ -24,6 +31,25 @@ VTOrc has a new flag `--allow-emergency-reparent` that allows the users to toggl The users that want VTOrc to fix the replication issues, but don't want it to run any reparents should start using this flag. By default, VTOrc will be able to run `EmergencyReparentShard`. The users must specify the flag to `false` to change the behaviour. +### New stats + +#### VTBackup `Phase` stat + +In v17, the `vtbackup` stat `DurationByPhase` stat was added measuring the time spent by `vtbackup` in each phase. This stat turned out to be awkward to use in production, and has been replaced in v18 by a binary-valued `Phase` stat. + +`Phase` reports a 1 (active) or a 0 (inactive) for each of the following phases: + + * `CatchUpReplication` + * `InitialBackup` + * `RestoreLastBackup` + * `TakeNewBackup` + +To calculate how long `vtbackup` has spent in a given phase, sum the 1-valued data points over time and multiply by the data collection or reporting interval. For example, in Prometheus: + +``` +sum_over_time(vtbackup_phase{phase="TakeNewBackup"}) * +``` + ### VTAdmin #### vtadmin-web updated to node v18.16.0 (LTS) @@ -38,4 +64,4 @@ The `k8stopo` has been deprecated in Vitess 17, also see https://github.com/vite #### Deleted `vtgr` -The `vtgr` has been deprecated in Vitess 17, also see https://github.com/vitessio/vitess/issues/13300. With Vitess 18 `vtgr` has been removed. \ No newline at end of file +The `vtgr` has been deprecated in Vitess 17, also see https://github.com/vitessio/vitess/issues/13300. With Vitess 18 `vtgr` has been removed. From fb5fe50b13e14e91c46a15afbcdfad7759d7b7b1 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 19 Sep 2023 13:02:41 +0100 Subject: [PATCH 09/10] deprecate not deletE Signed-off-by: Max Englander --- changelog/18.0/18.0.0/summary.md | 10 +++++----- go/cmd/vtbackup/vtbackup.go | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/changelog/18.0/18.0.0/summary.md b/changelog/18.0/18.0.0/summary.md index 26e01ef8fe0..6b996c5ed0f 100644 --- a/changelog/18.0/18.0.0/summary.md +++ b/changelog/18.0/18.0.0/summary.md @@ -5,7 +5,6 @@ - **[Major Changes](#major-changes)** - **[Breaking Changes](#breaking-changes)** - [Local examples now use etcd v3 storage and API](#local-examples-etcd-v3) - - [VTBackup stat `DurationByPhase` removed](#remove-vtbackup-stat-duration-by-phase) - **[New command line flags and behavior](#new-flag)** - [VTOrc flag `--allow-emergency-reparent`](#new-flag-toggle-ers) - [VTOrc flag `--change-tablets-with-errant-gtid-to-drained`](#new-flag-errant-gtid-convert) @@ -17,6 +16,7 @@ - [Deleted `V3` planner](#deleted-v3) - [Deleted `k8stopo`](#deleted-k8stopo) - [Deleted `vtgr`](#deleted-vtgr) + - [Deprecated VTBackup stat `DurationByPhase`](#deprecated-vtbackup-stat-duration-by-phase) - **[New stats](#new-stats)** - [VTGate Vindex unknown parameters](#vtgate-vindex-unknown-parameters) - [VTBackup stat `Phase`](#vtbackup-stat-phase) @@ -41,10 +41,6 @@ removed this legacy etcd usage and instead use the new (default) etcd v3 storage examples in any sort of long-term non-testing capacity, then you will need to explicitly use the v2 storage and API mode or [migrate your existing data from v2 to v3](https://etcd.io/docs/v3.5/tutorials/how-to-migrate/). -#### VTbackup stat `DurationByPhase` removed - -VTBackup stat `DurationByPhase` is removed. Use the binary-valued `Phase` stat instead. - ### New command line flags and behavior #### VTOrc flag `--allow-emergency-reparent` @@ -121,6 +117,10 @@ the `k8stopo` has been removed. The `vtgr` has been deprecated in Vitess 17, also see https://github.com/vitessio/vitess/issues/13300. With Vitess 18 `vtgr` has been removed. +#### Deprecated VTbackup stat `DurationByPhase` + +VTBackup stat `DurationByPhase` is deprecated. Use the binary-valued `Phase` stat instead. + ### New stats #### VTGate Vindex unknown parameters diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go index 33cbaa33e18..ce61735d767 100644 --- a/go/cmd/vtbackup/vtbackup.go +++ b/go/cmd/vtbackup/vtbackup.go @@ -130,6 +130,14 @@ var ( detachedMode bool keepAliveTimeout = 0 * time.Second disableRedoLog = false + + // Deprecated, use "Phase" instead. + deprecatedDurationByPhase = stats.NewGaugesWithSingleLabel( + "DurationByPhaseSeconds", + "[DEPRECATED] How long it took vtbackup to perform each phase (in seconds).", + "phase", + ) + // This gauge is updated 3*N times during the course of a vtbackup run, // where N is the number of different phases vtbackup transitions through. // Once to initialize to 0, another time to set the phase to active (1), @@ -312,9 +320,11 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back } initCtx, initCancel := context.WithTimeout(ctx, mysqlTimeout) defer initCancel() + initMysqldAt := time.Now() if err := mysqld.Init(initCtx, mycnf, initDBSQLFile); err != nil { return fmt.Errorf("failed to initialize mysql data dir and start mysqld: %v", err) } + deprecatedDurationByPhase.Set("InitMySQLd", int64(time.Since(initMysqldAt).Seconds())) // Shut down mysqld when we're done. defer func() { // Be careful not to use the original context, because we don't want to @@ -376,12 +386,14 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back return err } + backupParams.BackupTime = time.Now() // Now we're ready to take the backup. phase.Set(phaseNameInitialBackup, int64(1)) defer phase.Set(phaseNameInitialBackup, int64(0)) if err := mysqlctl.Backup(ctx, backupParams); err != nil { return fmt.Errorf("backup failed: %v", err) } + deprecatedDurationByPhase.Set("InitialBackup", int64(time.Since(backupParams.BackupTime).Seconds())) log.Info("Initial backup successful.") phase.Set(phaseNameInitialBackup, int64(0)) return nil @@ -391,6 +403,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back defer phase.Set(phaseNameRestoreLastBackup, int64(0)) backupDir := mysqlctl.GetBackupDir(initKeyspace, initShard) log.Infof("Restoring latest backup from directory %v", backupDir) + restoreAt := time.Now() params := mysqlctl.RestoreParams{ Cnf: mycnf, Mysqld: mysqld, @@ -419,6 +432,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back default: return fmt.Errorf("can't restore from backup: %v", err) } + deprecatedDurationByPhase.Set("RestoreLastBackup", int64(time.Since(restoreAt).Seconds())) phase.Set(phaseNameRestoreLastBackup, int64(0)) // As of MySQL 8.0.21, you can disable redo logging using the ALTER INSTANCE @@ -504,6 +518,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back // We're caught up on replication to at least the point the primary // was at when this vtbackup run started. log.Infof("Replication caught up to %v after %v", status.Position, time.Since(waitStartTime)) + deprecatedDurationByPhase.Set("CatchUpReplication", int64(time.Since(waitStartTime).Seconds())) break } if !lastStatus.Position.IsZero() { @@ -550,6 +565,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back } if restartBeforeBackup { + restartAt := time.Now() log.Info("Proceeding with clean MySQL shutdown and startup to flush all buffers.") // Prep for full/clean shutdown (not typically the default) if err := mysqld.ExecuteSuperQuery(ctx, "SET GLOBAL innodb_fast_shutdown=0"); err != nil { @@ -563,14 +579,17 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back if err := mysqld.Start(ctx, mycnf); err != nil { return fmt.Errorf("Could not start MySQL after full shutdown: %v", err) } + deprecatedDurationByPhase.Set("RestartBeforeBackup", int64(time.Since(restartAt).Seconds())) } // Now we can take a new backup. + backupAt := time.Now() phase.Set(phaseNameTakeNewBackup, int64(1)) defer phase.Set(phaseNameTakeNewBackup, int64(0)) if err := mysqlctl.Backup(ctx, backupParams); err != nil { return fmt.Errorf("error taking backup: %v", err) } + deprecatedDurationByPhase.Set("TakeNewBackup", int64(time.Since(backupAt).Seconds())) phase.Set(phaseNameTakeNewBackup, int64(0)) // Return a non-zero exit code if we didn't meet the replication position From d8e54a31b764ffd7c7afefd2b7e630b720fac2ec Mon Sep 17 00:00:00 2001 From: Max Englander Date: Wed, 20 Sep 2023 00:25:48 +0100 Subject: [PATCH 10/10] CatchUpReplication -> CatchupReplication Signed-off-by: Max Englander --- changelog/18.0/18.0.0/summary.md | 4 ++-- go/cmd/vtbackup/vtbackup.go | 32 ++++++++++++++++---------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/changelog/18.0/18.0.0/summary.md b/changelog/18.0/18.0.0/summary.md index 6b996c5ed0f..db2494d3794 100644 --- a/changelog/18.0/18.0.0/summary.md +++ b/changelog/18.0/18.0.0/summary.md @@ -133,7 +133,7 @@ In v17, the `vtbackup` stat `DurationByPhase` stat was added measuring the time `Phase` reports a 1 (active) or a 0 (inactive) for each of the following phases: - * `CatchUpReplication` + * `CatchupReplication` * `InitialBackup` * `RestoreLastBackup` * `TakeNewBackup` @@ -147,7 +147,7 @@ sum_over_time(vtbackup_phase{phase="TakeNewBackup"}) * `PhaseStatus` reports a 1 (active) or a 0 (inactive) for each of the following phases and statuses: - * `CatchUpReplication` phase has statuses `Stalled` and `Stopped`. + * `CatchupReplication` phase has statuses `Stalled` and `Stopped`. * `Stalled` is set to `1` when replication stops advancing. * `Stopped` is set to `1` when replication stops before `vtbackup` catches up with the primary. diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go index ce61735d767..ebf83526cad 100644 --- a/go/cmd/vtbackup/vtbackup.go +++ b/go/cmd/vtbackup/vtbackup.go @@ -100,12 +100,12 @@ const ( // forever for things that should be quick. operationTimeout = 1 * time.Minute - phaseNameCatchUpReplication = "CatchUpReplication" + phaseNameCatchupReplication = "CatchupReplication" phaseNameInitialBackup = "InitialBackup" phaseNameRestoreLastBackup = "RestoreLastBackup" phaseNameTakeNewBackup = "TakeNewBackup" - phaseStatusCatchUpReplicationStalled = "Stalled" - phaseStatusCatchUpReplicationStopped = "Stopped" + phaseStatusCatchupReplicationStalled = "Stalled" + phaseStatusCatchupReplicationStopped = "Stopped" ) var ( @@ -154,7 +154,7 @@ var ( "phase", ) phaseNames = []string{ - phaseNameCatchUpReplication, + phaseNameCatchupReplication, phaseNameInitialBackup, phaseNameRestoreLastBackup, phaseNameTakeNewBackup, @@ -165,9 +165,9 @@ var ( []string{"phase", "status"}, ) phaseStatuses = map[string][]string{ - phaseNameCatchUpReplication: { - phaseStatusCatchUpReplicationStalled, - phaseStatusCatchUpReplicationStopped, + phaseNameCatchupReplication: { + phaseStatusCatchupReplicationStalled, + phaseStatusCatchupReplicationStopped, }, } ) @@ -491,8 +491,8 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back backupParams.BackupTime = time.Now() // Wait for replication to catch up. - phase.Set(phaseNameCatchUpReplication, int64(1)) - defer phase.Set(phaseNameCatchUpReplication, int64(0)) + phase.Set(phaseNameCatchupReplication, int64(1)) + defer phase.Set(phaseNameCatchupReplication, int64(0)) var ( lastStatus replication.ReplicationStatus @@ -523,22 +523,22 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back } if !lastStatus.Position.IsZero() { if status.Position.Equal(lastStatus.Position) { - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 1) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStalled}, 1) } else { - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 0) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStalled}, 0) } } if !status.Healthy() { log.Warning("Replication has stopped before backup could be taken. Trying to restart replication.") - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 1) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 1) if err := startReplication(ctx, mysqld, topoServer); err != nil { log.Warningf("Failed to restart replication: %v", err) } } else { - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 0) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 0) } } - phase.Set(phaseNameCatchUpReplication, int64(0)) + phase.Set(phaseNameCatchupReplication, int64(0)) // Stop replication and see where we are. if err := mysqld.StopReplication(nil); err != nil { @@ -554,8 +554,8 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back if !status.Position.AtLeast(primaryPos) && status.Position.Equal(restorePos) { return fmt.Errorf("not taking backup: replication did not make any progress from restore point: %v", restorePos) } - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStalled}, 0) - phaseStatus.Set([]string{phaseNameCatchUpReplication, phaseStatusCatchUpReplicationStopped}, 0) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStalled}, 0) + phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 0) // Re-enable redo logging. if disabledRedoLog {