-
Notifications
You must be signed in to change notification settings - Fork 105
WIP Graphitey consolidation step 2 and 3 #557
Conversation
chunkMinTs := now - (now % ms.chunkSpan) - uint32(ms.chunkMaxStale) | ||
metricMinTs := now - (now % ms.chunkSpan) - uint32(ms.metricMaxStale) | ||
chunkMinTs := now - uint32(ms.chunkMaxStale) | ||
metricMinTs := now - uint32(ms.metricMaxStale) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't recall why we had to subtract the chunkspan for these calculations. if we still have to do this, this would be trickier, as the chunkspan is specific to the metric that we'll check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old code calculated the last chunk T0 that is older then chunkMaxStale. I believe this is because the logic in AggMetric.GC() compared hte chunkMinTs and metricMinTs to the chunk.T0, but as we now compare against chunk.LastWrite this new way should be fine.
The question this raises though, is should our maxStale settings be per retention policy? If a MT service is receiving some metrics every 10seconds and some every few hours, then applying the same maxStale settings to both is not ideal.
e40a6a0
to
05555be
Compare
todo:
|
85f424b
to
b68228a
Compare
b68228a
to
0da8774
Compare
TODO: manual tests:
|
note that whisper files and structures are untouched. We only use this extended format for metrictank. also: if failure to parse, return error
ChunkSpan, NumChunks and Ready fields are now supported, also optionally specified via the config file
so we can efficiently reference a schema or aggregation definition also make WhisperAggregation.Match public
because we want to work with them in metrictank
break | ||
} | ||
} | ||
|
||
} | ||
if req.Archive == -1 { | ||
return nil, errUnSatisfiable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really want the whole http request to end up in an error if one of the req
s was not satisfiable? couldn't we just skip that one and continue with the others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO yes. this is such a rare case (basically all archives have ready=false). it's okay to make this problem very clear so that the admin can fix it. once we have grafana/grafana#6448 we could make this a bit nicer (it would allow us to show a warning/error and still return the data we can)
api/query_engine.go
Outdated
|
||
// let's see first if we can deliver it via lower-res rollup archives, if we have any | ||
retentions := mdata.GetRetentions(req.SchemaI) | ||
for i, ret := range retentions[req.Archive+1:] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if !ret.Ready
then the retention will not be useful here, so we might as well do if !ret.Ready {continue}
and skip the potential unnecessary uint32(ret.SecondsPerPoint())
. then it can be removed from the condition 2 lines down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you'd add another if clause ? i don't see how that's so much better.
api/query_engine.go
Outdated
if interval == archInterval && ret.Ready { | ||
// we're in luck. this will be more efficient than runtime consolidation | ||
req.Archive = req.Archive + 1 + i | ||
req.ArchInterval = uint32(ret.SecondsPerPoint()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already did that conversion, so this could be archInterval
@@ -67,15 +67,18 @@ func (in DefaultHandler) Process(metric *schema.MetricData, partition int32) { | |||
return | |||
} | |||
|
|||
schemaI, _ := mdata.MatchSchema(metric.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i see that right this means that every time when a metric datapoint arrives we're going to do a regex pattern match at https://github.com/raintank/metrictank/blob/0da87741ab2039f5c3ff2988eb2c74c5bd5b87eb/vendor/github.com/lomik/go-carbon/persister/whisper_schema.go#L87
I'm wondering if it wouldn't be more efficient to associate these schemaI
and aggI
with the Node
s in idx/memory/memory.go
and then this regex match would only be necessary once per metric, when a new metric gets added to the index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 pattern matches actually. one for schema, once for aggregation.
that's a good idea. So basically instead of passing in the schemaI and aggI into AddOrUpdate, AddOrUpdate would return those values instead?
That seems like a good idea to me. For now I've purposely left all the matching obviously unoptimized, to get an idea of how it would perform, but you're right that it'll likely be quite the impact. I still have to measure it though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it would make sense to have idx.AddOrgUpdate() to return idx.Node
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it would make sense to have idx.AddOrgUpdate() to return idx.Node
in the case where the metric already exists, we can currently just check for the def in DefById via a simple id lookup, and return if it exists. To support what you suggest, I think we would have either do a lookup in the tree in that case, or somehow link to the idx.Node, with extra data in DefsById or something.
I think It's not the index' concern that the caller of the index would like to know these properties about an entry when it tries to add a new entry, I rather keep the index focused on it's task of being an index rather than trying to accommodate for this.
the current caching mechanism seems to work well enough.
Overall looking really good to me. |
I would rather just pull out all that code and have our own mini library (based on that patched code) for parsing of the config files, and none of all the other stuff that that library comes with (e.g. the whisper specific bits). but that's lower prio. |
0da8774
to
486d645
Compare
|
@replay see the new commits. |
0842aa1
to
72f4cf4
Compare
1) use storage-schemas.conf and storage-aggregation.conf to configure numchunks, chunkspans, ttls and aggregations. For carbon also to find out the raw interval. For other input plugins we only honor interval specified in the data Note: - persist message now need to mention which metric names, in case metrics need to be created on the other end. 2) support configurable retention schemes based on patterns, like graphite. adjust alignRequests accordingly 3) support configurable aggregation bands to control which rollups get created per metric pattern. 4) add back 'last' roll-up aggregation and runtime consolidation It was removed in #142 / 2be3546 based on a discussion in https://github.com/raintank/strategy/issues/11 on the premise that storing this extra series for all our metrics wasn't worth it and it's not queryable via consolidateBy which only does sum, avg, min and max. However: 1) now aggregations are configurable: One only enables specific aggregation bands on an as-needed basis. 2) graphite supports last rollups, so we must too. 3) for certain types of data (e.g. counters) it's simply the best (better than all the rest) approach 4) storage level aggregation is not as tied to consolidateBy() as we originally made it out to be, so that point doesn't really apply anymore. 5) we also restore the 'last' runtime consolidation code, even though we will no longer do runtime consolidation in MT for now but the upcoming normalization uses the same code. As we should use 'last' for normalization if rollups was 'last'. (see next commit for more info)
"runtime consolidation" used to be controlled by consolidateBy and affect consolidation after api processing by (and in) graphite, as well as before (when normalizing data before processing). Now these two are distinct: - only the post-function processing should be affected by consolidateBy, and should be referred to as "runtime consolidation". Since this code is currently in graphite, metrictank shouldn't really do anything. In particular we still parse out consolidateBy(..., "func") since that's what graphite-metrictank sends, but we ignore it - the only runtime consolidation metrictank currently does, is normalizing archives (making sure they are in the same resolution) before feeding them into the api processing pipeline. To avoid confusion this is now called normalizing, akin to `normalize` in graphite's functions.py This is an extension of the consolidation mechanism used to create the rollup archive. This used to be controlled by consolidateBy, but not anymore. Now: * the archive being read from is whathever is the primary (first) aggregationMethod defined in storage-aggregation.conf . * consolidation method for normalization is the same (always). Both are controlled via the consolidation property of Req. Later we can extend this to choose the method via queries, but it'll have to use a mechanism separate of consolidateBy.
before this, all regex matching dominated the cpu profile. With this, cpu usage reduced by easily 5x Though we still have: flat cumulative 70ms 0.86% 69.79% 770ms 9.49% github.com/raintank/metrictank/mdata/matchcache.(*Cache).Get due to the map locking We could further optimize this, probably, by changing the idx.AddOrUpdate signature to returning SchemaI and AggI, instead of requiring it as input as @replay suggested. This way we only have to match if it wasn't in the index already. However this requires more intensive changes to the index than I'm comfortable with right now (DefById only has the metricdef, not the properties, we could add them but then we need to adjust how we work with DefById everywhere and do we still need to store the properties in the tree, etc) I rather re-address this when the need is clearer and we have time to give this the attention it deserves.
on my laptop: MASTER: BenchmarkProcess-8 2000000 718 ns/op PASS ok github.com/raintank/metrictank/input 2.796s HEAD: go test -run='^$' -bench=. BenchmarkProcessUniqueMetrics-8 1000000 2388 ns/op BenchmarkProcessSameMetric-8 2000000 885 ns/op PASS ok github.com/raintank/metrictank/input 6.081s So we're a bit slower but carbon input should be a lot faster (for which we don't have benchmarks) since it used to do regex matching all the time
6056bda
to
3a6a85e
Compare
manual testingsetup
storage-schemas.conf:
storage-aggregation.conf:
resultsverified:
TODO:
currently working on this:
will either try to get something like this working:
or will use mt-store-cat, maybe. |
much cleaner code if I may say so. also: consistently apply a simpler "add default to catch all remaining metrics" default mechanism.
This reverts commit 3bae47f. They consume about 1% cpu (flat and cumul) under heavy ingest load and they basically just confirm it works exactly like it should. But the commit is here in case someone ever wants to re-apply it.
3a6a85e
to
8ddd548
Compare
prints how many chunks per series in each table, with TTL's optionally filter down by metric prefix
below is the different series in each table, with TTL in hours (rounded) and how many chunk each has.
|
one thing i should definitely still test, is the same thing as ^^ but metrics that fall into the default schema/agg settings. |
I can confirm, when i remove the default rules from the storage-aggregations.conf and storage-schemas.conf posted above, it relies on the defaults built into MT : minutely points for 7 days, for all
|
|
||
item.XFilesFactor, err = strconv.ParseFloat(s.ValueOf("xFilesFactor"), 64) | ||
if err != nil { | ||
return Aggregations{}, fmt.Errorf("[%s]: failed to parse xFilesFactor %q: %s", item.Name, s.ValueOf("xFilesFactor"), err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not return result
here instead of instantiating a new Aggregations{}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could go either way on this. this code is a bit more explicit that it's an empty value, but you're right we could reuse result. I don't see a strong reason to prefer either way.
methodStrs := strings.Split(aggregationMethodStr, ",") | ||
for _, methodStr := range methodStrs { | ||
switch methodStr { | ||
case "average", "avg": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is avg
actually an allowed value in whisper? i can't see it here https://github.com/graphite-project/whisper/blob/f7e05cef81279bde327d8f1501acbf9caf730102/whisper.py#L515
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. it's not documented in the config documentation either http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-aggregation-conf ; so i guess metrictank will accept it because it is nice to its users.
return fmt.Errorf("lower resolution retentions must be evenly divisible by higher resolution retentions (%d does not divide by %d)", ret.SecondsPerPoint, prev.SecondsPerPoint) | ||
} | ||
|
||
if prev.MaxRetention() >= ret.MaxRetention() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't that be >
instead of >=
according to rule 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no? rule 4 says " Lower precision archives must cover larger time intervals than higher precision archives." we assert this by erroring out if the lower precision archive (ret) is smaller than or equal to the higher precision archive (prev).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh right, all good
return item.val | ||
} | ||
|
||
func (m *Cache) maintain() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't it maybe make sense to have some way to shut this down? otherwise when there are unit test that initialize many instances of Cache
we'll end up with tons of maintain()
processes which also cause the cache to not get freed even though it might already not be used anymore, because it's still referenced by that proc.
could be a simple flag that's checked in the for, which makes it return if true
diff := int64(m.expireAfter.Seconds()) | ||
for now := range ticker.C { | ||
nowUnix := now.Unix() | ||
m.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this keeps the lock until the for
iterated over every single entry in m.data
. are you sure that's not going to lead to long blocks if there's a really large number of metrics? even though it's only on the ingestion path, it would be quite easy to split it up and check something like len(m.data) / 1000
items every m.cleanInterval / 1000
seconds.
I see a few problems with this implementation.
So my recommandation is to
Change the idx.MetricIndex interface to
in MemoryId, change
As metricDefs/metricData are added to the index, lookup the SchemaId and AggId. When the the schemaId or AggId are needed they can be fetched from the index efficiently with |
Also please |
we should then also update the idx.Node type I think, its Defs property should be a so with that in mind, i'll implement those changes. |
closing in favor of #570 |
see #463 (comment)
mostly working on step 3 first. need to find an elegant way to make available the retentions for every req in alignRequests. perhaps the index could store it for each
idx.Node
(e.g. do all the matching when ingesting new points into index), or the query engine could keep a cache of paths->retention scheme. (do the matching when data is requested)