diff --git a/CHANGELOG.md b/CHANGELOG.md index 76b41e71cf41..64dced2c1cf3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,13 @@ this platform. FreeBSD builds will return in a future release. file. This was prevented by accidentally writing to a readonly volume mount. (@rfratto) +- [ENHANCEMENT] `wal_cleanup_age` and `wal_cleanup_period` have been added to the + top-level Prometheus configuration section. These settings control how Write Ahead + Logs (WALs) that are not associated with any instances are cleaned up. By default, + WALs not associated with an instance that have not been written in the last 12 hours + are eligible to be cleaned up. This cleanup can be disabled by setting `wal_cleanup_period` + to `0`. (#304) (@56quarters) + # v0.9.1 (2021-01-04) NOTE: FreeBSD builds will not be included for this release. There is a bug in an diff --git a/docs/configuration-reference.md b/docs/configuration-reference.md index cef3f6663d60..dd5e89459044 100644 --- a/docs/configuration-reference.md +++ b/docs/configuration-reference.md @@ -153,6 +153,14 @@ define one instance. # Configure the directory used by instances to store their WAL. [wal_directory: | default = ""] +# Configures how long ago an abandoned (not associated with an instance) WAL +# may be written to before being eligible to be deleted +[wal_cleanup_age: | default = "12h"] + +# Configures how often checks for abandoned WALs to be deleted are performed. +# A value of 0 disables periodic cleanup of abandoned WALs +[wal_cleanup_period: | default = "30m"] + # The list of Prometheus instances to launch with the agent. configs: [- ] diff --git a/pkg/prom/agent.go b/pkg/prom/agent.go index b3b9615c3114..69bdd9ae80db 100644 --- a/pkg/prom/agent.go +++ b/pkg/prom/agent.go @@ -23,6 +23,8 @@ var ( DefaultConfig = Config{ Global: config.DefaultGlobalConfig, InstanceRestartBackoff: instance.DefaultBasicManagerConfig.InstanceRestartBackoff, + WALCleanupAge: DefaultCleanupAge, + WALCleanupPeriod: DefaultCleanupPeriod, ServiceConfig: ha.DefaultConfig, ServiceClientConfig: client.DefaultConfig, InstanceMode: DefaultInstanceMode, @@ -70,6 +72,8 @@ type Config struct { Global config.GlobalConfig `yaml:"global"` WALDir string `yaml:"wal_directory"` + WALCleanupAge time.Duration `yaml:"wal_cleanup_age"` + WALCleanupPeriod time.Duration `yaml:"wal_cleanup_period"` ServiceConfig ha.Config `yaml:"scraping_service"` ServiceClientConfig client.Config `yaml:"scraping_service_client"` Configs []instance.Config `yaml:"configs,omitempty"` @@ -126,6 +130,8 @@ func (c *Config) ApplyDefaults() error { // RegisterFlags defines flags corresponding to the Config. func (c *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&c.WALDir, "prometheus.wal-directory", "", "base directory to store the WAL in") + f.DurationVar(&c.WALCleanupAge, "prometheus.wal-cleanup-age", DefaultConfig.WALCleanupAge, "remove abandoned (unused) WALs older than this") + f.DurationVar(&c.WALCleanupPeriod, "prometheus.wal-cleanup-period", DefaultConfig.WALCleanupPeriod, "how often to check for abandoned WALs") f.DurationVar(&c.InstanceRestartBackoff, "prometheus.instance-restart-backoff", DefaultConfig.InstanceRestartBackoff, "how long to wait before restarting a failed Prometheus instance") c.ServiceConfig.RegisterFlagsWithPrefix("prometheus.service.", f) @@ -141,7 +147,8 @@ type Agent struct { logger log.Logger reg prometheus.Registerer - cm instance.Manager + cm instance.Manager + cleaner *WALCleaner instanceFactory instanceFactory @@ -173,6 +180,16 @@ func newAgent(reg prometheus.Registerer, cfg Config, logger log.Logger, fact ins // collect metrics on the number of active configs. a.cm = instance.NewCountingManager(reg, a.cm) + // Periodically attempt to clean up WALs from instances that aren't being run by + // this agent anymore. + a.cleaner = NewWALCleaner( + a.logger, + a.cm, + cfg.WALDir, + cfg.WALCleanupAge, + cfg.WALCleanupPeriod, + ) + allConfigsValid := true for _, c := range cfg.Configs { if err := a.cm.ApplyConfig(c); err != nil { @@ -230,6 +247,7 @@ func (a *Agent) Stop() { level.Error(a.logger).Log("msg", "failed to stop scraping service server", "err", err) } } + a.cleaner.Stop() a.cm.Stop() } diff --git a/pkg/prom/agent_test.go b/pkg/prom/agent_test.go index 837bc9dbb950..ab5036da2e46 100644 --- a/pkg/prom/agent_test.go +++ b/pkg/prom/agent_test.go @@ -278,6 +278,10 @@ func (i *fakeInstance) TargetsActive() map[string][]*scrape.Target { return nil } +func (i *fakeInstance) StorageDirectory() string { + return "" +} + type fakeInstanceFactory struct { mut sync.Mutex mocks []*fakeInstance diff --git a/pkg/prom/cleaner.go b/pkg/prom/cleaner.go new file mode 100644 index 000000000000..934d43689a44 --- /dev/null +++ b/pkg/prom/cleaner.go @@ -0,0 +1,270 @@ +package prom + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/grafana/agent/pkg/prom/instance" + "github.com/grafana/agent/pkg/prom/wal" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + promwal "github.com/prometheus/prometheus/tsdb/wal" +) + +const ( + DefaultCleanupAge = 12 * time.Hour + DefaultCleanupPeriod = 30 * time.Minute +) + +var ( + discoveryError = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "agent_prometheus_cleaner_storage_error_total", + Help: "Errors encountered discovering local storage paths", + }, + []string{"storage"}, + ) + + segmentError = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "agent_prometheus_cleaner_segment_error_total", + Help: "Errors encountered finding most recent WAL segments", + }, + []string{"storage"}, + ) + + managedStorage = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "agent_prometheus_cleaner_managed_storage", + Help: "Number of storage directories associated with managed instances", + }, + ) + + abandonedStorage = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "agent_prometheus_cleaner_abandoned_storage", + Help: "Number of storage directories not associated with any managed instance", + }, + ) + + cleanupRunsSuccess = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "agent_prometheus_cleaner_success_total", + Help: "Number of successfully removed abandoned WALs", + }, + ) + + cleanupRunsErrors = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "agent_prometheus_cleaner_errors_total", + Help: "Number of errors removing abandoned WALs", + }, + ) + + cleanupTimes = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "agent_prometheus_cleaner_cleanup_seconds", + Help: "Time spent performing each periodic WAL cleanup", + }, + ) +) + +// lastModifiedFunc gets the last modified time of the most recent segment of a WAL +type lastModifiedFunc func(path string) (time.Time, error) + +func lastModified(path string) (time.Time, error) { + existing, err := promwal.Open(nil, path) + if err != nil { + return time.Time{}, err + } + + // We don't care if there are errors closing the abandoned WAL + defer func() { _ = existing.Close() }() + + _, last, err := existing.Segments() + if err != nil { + return time.Time{}, fmt.Errorf("unable to open WAL: %w", err) + } + + if last == -1 { + return time.Time{}, fmt.Errorf("unable to determine most recent segment for %s", path) + } + + // full path to the most recent segment in this WAL + lastSegment := promwal.SegmentName(path, last) + segmentFile, err := os.Stat(lastSegment) + if err != nil { + return time.Time{}, fmt.Errorf("unable to determine mtime for %s segment: %w", lastSegment, err) + } + + return segmentFile.ModTime(), nil +} + +// WALCleaner periodically checks for Write Ahead Logs (WALs) that are not associated +// with any active instance.ManagedInstance and have not been written to in some configured +// amount of time and deletes them. +type WALCleaner struct { + logger log.Logger + instanceManager instance.Manager + walDirectory string + walLastModified lastModifiedFunc + minAge time.Duration + period time.Duration + done chan bool +} + +// NewWALCleaner creates a new cleaner that looks for abandoned WALs in the given +// directory and removes them if they haven't been modified in over minAge. Starts +// a goroutine to periodically run the cleanup method in a loop +func NewWALCleaner(logger log.Logger, manager instance.Manager, walDirectory string, minAge time.Duration, period time.Duration) *WALCleaner { + c := &WALCleaner{ + logger: log.With(logger, "component", "cleaner"), + instanceManager: manager, + walDirectory: filepath.Clean(walDirectory), + walLastModified: lastModified, + minAge: DefaultCleanupAge, + period: DefaultCleanupPeriod, + done: make(chan bool), + } + + if minAge > 0 { + c.minAge = minAge + } + + // We allow a period of 0 here because '0' means "don't run the task". This + // is handled by not running a ticker at all in the run method. + if period >= 0 { + c.period = period + } + + go c.run() + return c +} + +// getManagedStorage gets storage directories used for each ManagedInstance +func (c *WALCleaner) getManagedStorage(instances map[string]instance.ManagedInstance) map[string]bool { + out := make(map[string]bool) + + for _, inst := range instances { + out[inst.StorageDirectory()] = true + } + + return out +} + +// getAllStorage gets all storage directories under walDirectory +func (c *WALCleaner) getAllStorage() []string { + var out []string + + _ = filepath.Walk(c.walDirectory, func(p string, info os.FileInfo, err error) error { + if os.IsNotExist(err) { + // The root WAL directory doesn't exist. Maybe this Agent isn't responsible for any + // instances yet. Log at debug since this isn't a big deal. We'll just try to crawl + // the direction again on the next periodic run. + level.Debug(c.logger).Log("msg", "WAL storage path does not exist", "path", p, "err", err) + } else if err != nil { + // Just log any errors traversing the WAL directory. This will potentially result + // in a WAL (that has incorrect permissions or some similar problem) not being cleaned + // up. This is better than preventing *all* other WALs from being cleaned up. + discoveryError.WithLabelValues(p).Inc() + level.Warn(c.logger).Log("msg", "unable to traverse WAL storage path", "path", p, "err", err) + } else if info.IsDir() && filepath.Dir(p) == c.walDirectory { + // Single level below the root are instance storage directories (including WALs) + out = append(out, p) + } + + return nil + }) + + return out +} + +// getAbandonedStorage gets the full path of storage directories that aren't associated with +// an active instance and haven't been written to within a configured duration (usually several +// hours or more). +func (c *WALCleaner) getAbandonedStorage(all []string, managed map[string]bool, now time.Time) []string { + var out []string + + for _, dir := range all { + if managed[dir] { + level.Debug(c.logger).Log("msg", "active WAL", "name", dir) + continue + } + + walDir := wal.SubDirectory(dir) + mtime, err := c.walLastModified(walDir) + if err != nil { + segmentError.WithLabelValues(dir).Inc() + level.Warn(c.logger).Log("msg", "unable to find segment mtime of WAL", "name", dir, "err", err) + continue + } + + diff := now.Sub(mtime) + if diff > c.minAge { + // The last segment for this WAL was modified more then $minAge (positive number of hours) + // in the past. This makes it a candidate for deletion since it's also not associated with + // any Instances this agent knows about. + out = append(out, dir) + } + + level.Debug(c.logger).Log("msg", "abandoned WAL", "name", dir, "mtime", mtime, "diff", diff) + } + + return out +} + +// run cleans up abandoned WALs (if period != 0) in a loop periodically until stopped +func (c *WALCleaner) run() { + // A period of 0 means don't run a cleanup task + if c.period == 0 { + return + } + + ticker := time.NewTicker(c.period) + defer ticker.Stop() + + for { + select { + case <-c.done: + level.Debug(c.logger).Log("msg", "stopping cleaner...") + return + case <-ticker.C: + c.cleanup() + } + } +} + +// cleanup removes any abandoned and unused WAL directories. Note that it shouldn't be +// necessary to call this method explicitly in most cases since it will be run periodically +// in a goroutine (started when WALCleaner is created). +func (c *WALCleaner) cleanup() { + start := time.Now() + all := c.getAllStorage() + managed := c.getManagedStorage(c.instanceManager.ListInstances()) + abandoned := c.getAbandonedStorage(all, managed, time.Now()) + + managedStorage.Set(float64(len(managed))) + abandonedStorage.Set(float64(len(abandoned))) + + for _, a := range abandoned { + level.Info(c.logger).Log("msg", "deleting abandoned WAL", "name", a) + err := os.RemoveAll(a) + if err != nil { + level.Error(c.logger).Log("msg", "failed to delete abandoned WAL", "name", a, "err", err) + cleanupRunsErrors.Inc() + } else { + cleanupRunsSuccess.Inc() + } + } + + cleanupTimes.Observe(time.Since(start).Seconds()) +} + +// Stop the cleaner and any background tasks running +func (c *WALCleaner) Stop() { + close(c.done) +} diff --git a/pkg/prom/cleaner_test.go b/pkg/prom/cleaner_test.go new file mode 100644 index 000000000000..c1e0aec166a8 --- /dev/null +++ b/pkg/prom/cleaner_test.go @@ -0,0 +1,155 @@ +package prom + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/grafana/agent/pkg/prom/instance" + "github.com/stretchr/testify/require" +) + +func TestWALCleaner_getAllStorageNoRoot(t *testing.T) { + walRoot := filepath.Join(os.TempDir(), "getAllStorageNoRoot") + logger := log.NewLogfmtLogger(os.Stderr) + cleaner := NewWALCleaner( + logger, + &instance.MockManager{}, + walRoot, + DefaultCleanupAge, + DefaultCleanupPeriod, + ) + + // Bogus WAL root that doesn't exist. Method should return no results + wals := cleaner.getAllStorage() + + require.Empty(t, wals) +} + +func TestWALCleaner_getAllStorageSuccess(t *testing.T) { + walRoot, err := ioutil.TempDir(os.TempDir(), "getAllStorageSuccess") + require.NoError(t, err) + defer os.RemoveAll(walRoot) + + walDir := filepath.Join(walRoot, "instance-1") + err = os.MkdirAll(walDir, 0755) + require.NoError(t, err) + + logger := log.NewLogfmtLogger(os.Stderr) + cleaner := NewWALCleaner( + logger, + &instance.MockManager{}, + walRoot, + DefaultCleanupAge, + DefaultCleanupPeriod, + ) + wals := cleaner.getAllStorage() + + require.Equal(t, []string{walDir}, wals) +} + +func TestWALCleaner_getAbandonedStorageBeforeCutoff(t *testing.T) { + walRoot, err := ioutil.TempDir(os.TempDir(), "getAbandonedStorageBeforeCutoff") + require.NoError(t, err) + defer os.RemoveAll(walRoot) + + walDir := filepath.Join(walRoot, "instance-1") + err = os.MkdirAll(walDir, 0755) + require.NoError(t, err) + + all := []string{walDir} + managed := make(map[string]bool) + now := time.Now() + + logger := log.NewLogfmtLogger(os.Stderr) + cleaner := NewWALCleaner( + logger, + &instance.MockManager{}, + walRoot, + 5*time.Minute, + DefaultCleanupPeriod, + ) + + cleaner.walLastModified = func(path string) (time.Time, error) { + return now, nil + } + + // Last modification time on our WAL directory is the same as "now" + // so there shouldn't be any results even though it's not part of the + // set of "managed" directories. + abandoned := cleaner.getAbandonedStorage(all, managed, now) + require.Empty(t, abandoned) +} + +func TestWALCleaner_getAbandonedStorageAfterCutoff(t *testing.T) { + walRoot, err := ioutil.TempDir(os.TempDir(), "getAbandonedStorageAfterCutoff") + require.NoError(t, err) + defer os.RemoveAll(walRoot) + + walDir := filepath.Join(walRoot, "instance-1") + err = os.MkdirAll(walDir, 0755) + require.NoError(t, err) + + all := []string{walDir} + managed := make(map[string]bool) + now := time.Now() + + logger := log.NewLogfmtLogger(os.Stderr) + cleaner := NewWALCleaner( + logger, + &instance.MockManager{}, + walRoot, + 5*time.Minute, + DefaultCleanupPeriod, + ) + + cleaner.walLastModified = func(path string) (time.Time, error) { + return now.Add(-30 * time.Minute), nil + } + + // Last modification time on our WAL directory is 30 minutes in the past + // compared to "now" and we've set the cutoff for our cleaner to be 5 + // minutes: our WAL directory should show up as abandoned + abandoned := cleaner.getAbandonedStorage(all, managed, now) + require.Equal(t, []string{walDir}, abandoned) +} + +func TestWALCleaner_cleanup(t *testing.T) { + walRoot, err := ioutil.TempDir(os.TempDir(), "cleanup") + require.NoError(t, err) + defer os.RemoveAll(walRoot) + + walDir := filepath.Join(walRoot, "instance-1") + err = os.MkdirAll(walDir, 0755) + require.NoError(t, err) + + now := time.Now() + logger := log.NewLogfmtLogger(os.Stderr) + manager := &instance.MockManager{} + manager.ListInstancesFunc = func() map[string]instance.ManagedInstance { + return make(map[string]instance.ManagedInstance) + } + + cleaner := NewWALCleaner( + logger, + manager, + walRoot, + 5*time.Minute, + DefaultCleanupPeriod, + ) + + cleaner.walLastModified = func(path string) (time.Time, error) { + return now.Add(-30 * time.Minute), nil + } + + // Last modification time on our WAL directory is 30 minutes in the past + // compared to "now" and we've set the cutoff for our cleaner to be 5 + // minutes: our WAL directory should be removed since it's abandoned + cleaner.cleanup() + _, err = os.Stat(walDir) + require.Error(t, err) + require.True(t, os.IsNotExist(err)) +} diff --git a/pkg/prom/http_test.go b/pkg/prom/http_test.go index 37f8dad11ddf..3946311a645b 100644 --- a/pkg/prom/http_test.go +++ b/pkg/prom/http_test.go @@ -143,3 +143,7 @@ func (i *mockInstanceScrape) Update(_ instance.Config) error { func (i *mockInstanceScrape) TargetsActive() map[string][]*scrape.Target { return i.tgts } + +func (i *mockInstanceScrape) StorageDirectory() string { + return "" +} diff --git a/pkg/prom/instance/instance.go b/pkg/prom/instance/instance.go index 1ef2f1815475..fd09d971a68f 100644 --- a/pkg/prom/instance/instance.go +++ b/pkg/prom/instance/instance.go @@ -486,6 +486,10 @@ func (i *Instance) TargetsActive() map[string][]*scrape.Target { return mgr.TargetsActive() } +func (i *Instance) StorageDirectory() string { + return i.wal.Directory() +} + type discoveryService struct { Manager *discovery.Manager diff --git a/pkg/prom/instance/manager.go b/pkg/prom/instance/manager.go index 904df3614979..655d8a3699a8 100644 --- a/pkg/prom/instance/manager.go +++ b/pkg/prom/instance/manager.go @@ -59,6 +59,7 @@ type ManagedInstance interface { Run(ctx context.Context) error Update(c Config) error TargetsActive() map[string][]*scrape.Target + StorageDirectory() string } // BasicManagerConfig controls the operations of a BasicManager. diff --git a/pkg/prom/instance/manager_test.go b/pkg/prom/instance/manager_test.go index 4f8e71ffba7b..7d9f0a367792 100644 --- a/pkg/prom/instance/manager_test.go +++ b/pkg/prom/instance/manager_test.go @@ -97,9 +97,10 @@ func TestBasicManager_ApplyConfig(t *testing.T) { } type mockInstance struct { - RunFunc func(ctx context.Context) error - UpdateFunc func(c Config) error - TargetsActiveFunc func() map[string][]*scrape.Target + RunFunc func(ctx context.Context) error + UpdateFunc func(c Config) error + TargetsActiveFunc func() map[string][]*scrape.Target + StorageDirectoryFunc func() string } func (m mockInstance) Run(ctx context.Context) error { @@ -122,3 +123,10 @@ func (m mockInstance) TargetsActive() map[string][]*scrape.Target { } panic("TargetsActiveFunc not provided") } + +func (m mockInstance) StorageDirectory() string { + if m.StorageDirectoryFunc != nil { + return m.StorageDirectoryFunc() + } + panic("StorageDirectoryFunc not provided") +} diff --git a/pkg/prom/instance/noop.go b/pkg/prom/instance/noop.go index 58d7e955afa2..a9d399af0f5e 100644 --- a/pkg/prom/instance/noop.go +++ b/pkg/prom/instance/noop.go @@ -22,3 +22,7 @@ func (NoOpInstance) Update(_ Config) error { func (NoOpInstance) TargetsActive() map[string][]*scrape.Target { return nil } + +func (NoOpInstance) StorageDirectory() string { + return "" +} diff --git a/pkg/prom/wal/util.go b/pkg/prom/wal/util.go index 0582df20718b..defa82d5f929 100644 --- a/pkg/prom/wal/util.go +++ b/pkg/prom/wal/util.go @@ -1,6 +1,7 @@ package wal import ( + "path/filepath" "sync" "github.com/prometheus/prometheus/tsdb/record" @@ -109,3 +110,8 @@ func (c *walDataCollector) StoreSeries(series []record.RefSeries, _ int) { } func (c *walDataCollector) SeriesReset(_ int) {} + +// Get the subdirectory within a Storage directory used for the Prometheus WAL +func SubDirectory(base string) string { + return filepath.Join(base, "wal") +} diff --git a/pkg/prom/wal/wal.go b/pkg/prom/wal/wal.go index 23367dc4b95d..4e9c531bfad7 100644 --- a/pkg/prom/wal/wal.go +++ b/pkg/prom/wal/wal.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math" - "path/filepath" "sync" "time" @@ -121,7 +120,7 @@ type Storage struct { // NewStorage makes a new Storage. func NewStorage(logger log.Logger, registerer prometheus.Registerer, path string) (*Storage, error) { - w, err := wal.NewSize(logger, registerer, filepath.Join(path, "wal"), wal.DefaultSegmentSize, true) + w, err := wal.NewSize(logger, registerer, SubDirectory(path), wal.DefaultSegmentSize, true) if err != nil { return nil, err }