-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
receive, rule: Lock TSDB directories #2915
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
09b71a6
Use lock files for Receive TSDBs
kakkoyun 1442bde
Use lock files for Rule TSDBs
kakkoyun f82e882
Clean up TSDB lock files on start up
kakkoyun 57c33d2
Add no-lockfile flag to control behavior
kakkoyun ba7a164
Update documenttation
kakkoyun 52ff3b6
Add changelog
kakkoyun 07e7ea7
Address review issues
kakkoyun File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ import ( | |
"math/rand" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"path/filepath" | ||
"strconv" | ||
"strings" | ||
|
@@ -25,9 +26,11 @@ import ( | |
"github.com/prometheus/prometheus/pkg/labels" | ||
"github.com/prometheus/prometheus/promql" | ||
"github.com/prometheus/prometheus/rules" | ||
tsdb "github.com/prometheus/prometheus/tsdb" | ||
"github.com/prometheus/prometheus/tsdb" | ||
tsdberrors "github.com/prometheus/prometheus/tsdb/errors" | ||
"github.com/prometheus/prometheus/util/strutil" | ||
"gopkg.in/alecthomas/kingpin.v2" | ||
|
||
"github.com/thanos-io/thanos/pkg/alert" | ||
"github.com/thanos-io/thanos/pkg/block/metadata" | ||
"github.com/thanos-io/thanos/pkg/component" | ||
|
@@ -51,7 +54,6 @@ import ( | |
"github.com/thanos-io/thanos/pkg/tls" | ||
"github.com/thanos-io/thanos/pkg/tracing" | ||
"github.com/thanos-io/thanos/pkg/ui" | ||
"gopkg.in/alecthomas/kingpin.v2" | ||
) | ||
|
||
// registerRule registers a rule command. | ||
|
@@ -77,7 +79,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { | |
Default("2h")) | ||
tsdbRetention := modelDuration(cmd.Flag("tsdb.retention", "Block retention time on local disk."). | ||
Default("48h")) | ||
|
||
noLockFile := cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").Bool() | ||
walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool() | ||
|
||
alertmgrs := cmd.Flag("alertmanagers.url", "Alertmanager replica URLs to push firing alerts. Ruler claims success if push to at least one alertmanager from discovered succeeds. The scheme should not be empty e.g `http` might be used. The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Alertmanager IPs through respective DNS lookups. The port defaults to 9093 or the SRV record's value. The URL path is used as a prefix for the regular Alertmanager API path."). | ||
|
@@ -134,7 +136,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { | |
MinBlockDuration: int64(time.Duration(*tsdbBlockDuration) / time.Millisecond), | ||
MaxBlockDuration: int64(time.Duration(*tsdbBlockDuration) / time.Millisecond), | ||
RetentionDuration: int64(time.Duration(*tsdbRetention) / time.Millisecond), | ||
NoLockfile: true, | ||
NoLockfile: *noLockFile, | ||
WALCompression: *walCompression, | ||
} | ||
|
||
|
@@ -350,6 +352,12 @@ func runRule( | |
if err != nil { | ||
return errors.Wrap(err, "open TSDB") | ||
} | ||
|
||
level.Debug(logger).Log("msg", "removing storage lock file if any") | ||
if err := removeLockfileIfAny(logger, dataDir); err != nil { | ||
return errors.Wrap(err, "remove storage lock files") | ||
} | ||
|
||
{ | ||
done := make(chan struct{}) | ||
g.Add(func() error { | ||
|
@@ -642,6 +650,21 @@ func runRule( | |
return nil | ||
} | ||
|
||
func removeLockfileIfAny(logger log.Logger, dataDir string) error { | ||
absdir, err := filepath.Abs(dataDir) | ||
if err != nil { | ||
return err | ||
} | ||
if err := os.Remove(filepath.Join(absdir, "lock")); err != nil { | ||
if os.IsNotExist(err) { | ||
return nil | ||
} | ||
return err | ||
} | ||
level.Info(logger).Log("msg", "a leftover lockfile found and removed") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice and explict! 👍 |
||
return nil | ||
} | ||
|
||
func parseFlagLabels(s []string) (labels.Labels, error) { | ||
var lset labels.Labels | ||
for _, l := range s { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ import ( | |
"io/ioutil" | ||
"os" | ||
"path" | ||
"path/filepath" | ||
"sync" | ||
|
||
"github.com/go-kit/kit/log" | ||
|
@@ -213,6 +214,32 @@ func (t *MultiTSDB) Sync(ctx context.Context) error { | |
return merr.Err() | ||
} | ||
|
||
func (t *MultiTSDB) RemoveLockFilesIfAny() error { | ||
fis, err := ioutil.ReadDir(t.dataDir) | ||
if err != nil { | ||
if os.IsNotExist(err) { | ||
return nil | ||
} | ||
return err | ||
} | ||
|
||
merr := terrors.MultiError{} | ||
for _, fi := range fis { | ||
if !fi.IsDir() { | ||
continue | ||
} | ||
if err := os.Remove(filepath.Join(t.defaultTenantDataDir(fi.Name()), "lock")); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would reuse the |
||
if os.IsNotExist(err) { | ||
continue | ||
} | ||
merr.Add(err) | ||
continue | ||
} | ||
level.Info(t.logger).Log("msg", "a leftover lockfile found and removed", "tenant", fi.Name()) | ||
} | ||
return merr.Err() | ||
} | ||
|
||
func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer { | ||
t.mtx.RLock() | ||
defer t.mtx.RUnlock() | ||
|
@@ -230,7 +257,7 @@ func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer { | |
func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error { | ||
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg) | ||
lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID}) | ||
dataDir := path.Join(t.dataDir, tenantID) | ||
dataDir := t.defaultTenantDataDir(tenantID) | ||
|
||
level.Info(logger).Log("msg", "opening TSDB") | ||
opts := *t.tsdbOpts | ||
|
@@ -263,6 +290,10 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant | |
return nil | ||
} | ||
|
||
func (t *MultiTSDB) defaultTenantDataDir(tenantID string) string { | ||
return path.Join(t.dataDir, tenantID) | ||
} | ||
|
||
func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenant, error) { | ||
// Fast path, as creating tenants is a very rare operation. | ||
t.mtx.RLock() | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm... this is actually something we can improve. Let's add TODO and ensure there is issue for this. We can leverage vertical compaciton just fine in this case.