-
Notifications
You must be signed in to change notification settings - Fork 809
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce TSDB blocks compactor #1942
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really clean, well done.
|
||
for _, userID := range users { | ||
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered). | ||
if ctx.Err() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this statement looks funny to my eyes. Can this be a switch with <- ctx.Done()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean changing it to something like the following? If yes, what's the real benefit? To me it looks we're writing the same thing using more lines of code.
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea it just felt more idiomatic to me to write it that way. But I have no qualms leaving it the way it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I would only use select
version if there were more things to select on. ctx.Err()
returns non-nil value exactly when context is finished, so I think this is fine.
|
||
syncer, err := compact.NewSyncer( | ||
c.logger, | ||
nil, // TODO(pracucci) we should pass the prometheus registerer, but we would need to inject the user label to each metric, otherwise we have clashing metrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've found this to be troublesome since we likely don't want per-user metrics anyways but want a rollup of all the metrics across users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point. Don't have an answer yet. As stated in the PR description, I would prefer to defer it to a subsequent PR, to not block the compactor because of this.
What's your take? Do you have any idea to solve this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh definitely don't block this PR for that. I too punted on a solution for that for the other user wrapped thanos components. Just wanted to add my thoughts to the comment.
I'm not sure there can be a good solution for this without changing the Thanos code to only call the register function once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh definitely don't block this PR for that
👍
I'm not sure there can be a good solution for this without changing the Thanos code
An option - in Thanos - would be exposing compact.syncerMetrics
and picking its instance in input in compact.NewSyncer()
so what we could create it once in Cortex and pass the same syncerMetrics
instance to multiple Syncer
.
@bwplotka would you see feasible having such refactoring in Thanos, just to help Cortex? Would you see a better way to do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to treat any Thanos package as a library, so if the use case of a struct in our package makes sense, then we are ok with it. (:
We can definitely allow passing metrics for syncer. Also, note that we changed syncer a bit and introduced block.MetadataFetcher
. I assume you use different syncers because of many buckets?
e4c021f
to
3daa7ee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial review, haven't looked at tests yet.
pkg/compactor/compactor.go
Outdated
f.Var( | ||
&cfg.BlockRanges, | ||
"compactor.block-ranges", | ||
"Comma separated list of compaction ranges expressed in the time duration format") | ||
|
||
f.DurationVar( | ||
&cfg.ConsistencyDelay, | ||
"compactor.consistency-delay", | ||
30*time.Minute, | ||
fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)) | ||
|
||
f.IntVar( | ||
&cfg.BlockSyncConcurrency, | ||
"compactor.block-sync-concurrency", | ||
20, | ||
"Number of goroutines to use when syncing block metadata from object storage") | ||
|
||
f.StringVar( | ||
&cfg.DataDir, | ||
"compactor.data-dir", | ||
"./data", | ||
"Data directory in which to cache blocks and process compactions") | ||
|
||
f.DurationVar( | ||
&cfg.CompactionInterval, | ||
"compactor.compaction-interval", | ||
time.Hour, | ||
"The frequency at which the compaction runs") | ||
|
||
f.IntVar( | ||
&cfg.CompactionRetries, | ||
"compactor.compaction-retries", | ||
3, | ||
"How many times to retry a failed compaction during a single compaction interval") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single line flag registration makes it much easier to find out the flag that you are looking for, though it makes a single line very long. For example: https://github.com/cortexproject/cortex/blob/3daa7eeebe49741746c66b503e0a29bb34c03f5a/pkg/ingester/ingester.go#L137-L147 as all the flags and their defaults are almost like in a table (when viewed without line wrapping).
aed461d
to
cfa680a
Compare
@thorfour @pstibrany @codesome May you take another look and eventually approve if you don't have further comments and you believe the current PR is good to be merged, please? It would help as a signal for maintainers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Awesome work! 🎉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A note for the future: We can parallelize at the user level if it becomes necessary.
65fc6e6
to
9746548
Compare
Integration tests are failing because of this unrelated issue (looks a temporary networking issue):
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job. Looks good to me with some tiny nits (as usual :))
// Set implements the flag.Value interface | ||
func (d *DurationList) Set(s string) error { | ||
values := strings.Split(s, ",") | ||
*d = make([]time.Duration, 0, len(values)) // flag.Parse may be called twice, so overwrite instead of append |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is tricky. flag package supports calling same parameter multiple times, in which case it will also call Set
method multiple times. More flag-compatible way of using it would therefore be: -compactor.block-ranges=2h -compactor.block-ranges=48h
. It's ok if we want to support list flag, but we should probably still append ranges from multiple Set
calls.
In either case, should these ranges be validated for being in correct order, before they are used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I was hedging a little too much against obviousness of behaviour. I thought if you had multiple flags with duration, it wasn't obvious in which order they are applied. But looking back on it now, it's pretty obvious they'll be sorted by time. I think it's probably a good change to get rid of that, and just use multiple flags if desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clearing of the duration list was because the flag parse function is called twice from the main package, which was resulting in a double list being created, despite the user only submitting a single list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like single comma-separated parameter option as well, but think that multiple parameters should be supported, and "append" is a correct option to use here, instead of overwrite.
About sorting -- where does that happen? I haven't found validation nor sorting (but haven't looked that hard)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason for double-parsing is that we need -config.file option to parse config file, and then reparse command line flags so that they take precendence. Easy to fix. I'll send separate PR. It came up here, because this Set
method is using incorrect semantics (replacing instead of appending flags).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've sent #1997 now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that #1997 is merged (with subsequent fix as well), it would make sense to use *d = append(*d, values...)
here, to make it work like standard flag. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I guess I'm too late. I can send separate PR later. :) Great to see this merged!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't wait to merge this PR before addressing this because it's not a regression introduced by this PR. I would be glad if you could take care of that 🙏
a2a6a7a
to
825539d
Compare
Thanks @pstibrany for your review. I've addressed all the comments, except the one about |
825539d
to
0bcd40c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, one minor nit
cfg.retryMinBackoff = 10 * time.Second | ||
cfg.retryMaxBackoff = time.Minute | ||
|
||
f.Var(&cfg.BlockRanges, "compactor.block-ranges", "Comma separated list of compaction ranges expressed in the time duration format") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add the default values to the help text?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There actually no need. When the --help
is generated, defaults are displayed reading them from the values set:
-compactor.block-ranges value
Comma separated list of compaction ranges expressed in the time duration format (default 2h0m0s,12h0m0s,24h0m0s)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
…ontext Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
0bcd40c
to
56adef2
Compare
What this PR does:
Following this design doc, this PR introduces a new component - called
compactor
- which uses the Thanos bucket compactor to compact TSDB blocks stored in the bucket.I've got the compactor running in a dev cluster and - as expected - it significantly reduce the memory used by the querier, due to better optimized Thanos index headers. For example, if the following chart (showing queriers memory usage), the compactor dropped from about 1.8K to 50 (1 per day, 50 days retention so far):
There are few things left out from this PR, that will be addressed in subsequent PRs:
TODO
inpkg/compactor/compactor.go
)Which issue(s) this PR fixes:
N/A
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]