Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Whisper importer aggregate conversion #712

Merged
merged 40 commits into from
Aug 31, 2017
Merged

Conversation

replay
Copy link
Contributor

@replay replay commented Aug 23, 2017

This adds functionality to the whisper-importer-reader which allows it to convert input data to the required schema on the fly while importing.
It expects a schema file as a parameter, in the format like MT uses it. Then it reads the whisper files and generates the required retentions by using the highest resolution available as input. In cases where the generated retention has higher resolution than what's available in the whisper files it "fakes" the higher resolutions by "fake-undoing" the aggregation mechanism.

To demonstrate this fake-increasing of the resolutions I've used some data that we have stored in MT (ops cluster) and also stored as whisper files with differing schemas (ziggurat). Then I imported those whisper files into another MT which uses another schema. Then I'm able to compare the data by using two different datasources:
edit

Fixes #710

@replay replay changed the title Importer agg conversion plan Whisper importer aggregate conversion Aug 23, 2017
Copy link
Contributor

@Dieterbe Dieterbe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It expects a schema file as a parameter, in the format like MT uses it.

practically, this will be the file that will be used by MT (HM) right? or is it the file that describes the whisper files?

does this follow the description of #710 exactly? but with also merging of different archives i guess?

If there is only an archive with higher resolution and at least the same TTL, it can use the defined consolidateBy function to decrease the resolution.

what happens if there's a whisper archive in 10s resolution but we want 15s ?

If there is only an archive with lower resolution that satisfies the TTL, it will have to repeat each input point factor-times.

this works for gauges, rates, counters etc but is problematic for statsd counts. eg a count can have a minutely value of 120 which means "we counted 120 occurrences over this minutely interval". if the points have to be secondly, their values should average to 2, not 120 every second.

"cnt": a.cnt,
"lst": a.lst,
"avg": a.sum / a.cnt,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constructing a new map just to return some values seems rather gross and inefficient.
the simplest alternative, I think, is to just make those members public. and maybe add a comment to the type that the members shouldn't be changed by other packages (rather obvious but still)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the latest commit

if _, ok := readArchives[archiveIdx]; !ok && len(readArchives) > 0 {
continue
}
if !nameFilter.Match([]byte(name)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this filter used so late? seems like we can do the filtering before we even open/read the file, and before we even put the filename on the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed that in the latest commit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it still after opening file and after putting it on channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved it to before pushing the name into the chan

@replay
Copy link
Contributor Author

replay commented Aug 25, 2017

@Dieterbe The format is exactly the same like the MT schema definition. I did that intentionally to make it as easy to use as possible, just give it the MT config file.

It will correctly convert from 10s to 15s too as tested here: https://github.com/raintank/metrictank/pull/712/files#diff-7218fd8d8872a6d3f498aa9f1f42be63R303

Depending on the aggregation mechanism it decides how to increase the resolution of data. If the aggregation mechanism was sum then it will divide by the according factor, so 120 would result in 2: https://github.com/raintank/metrictank/pull/712/files#diff-c3e64180d5dd147d8ea6691375b144c9R131

@replay
Copy link
Contributor Author

replay commented Aug 28, 2017

@Dieterbe the logic follows the description on #710. it is important to note though that the input archive is selected separately for each generated datapoint, so it is possible that one generated rollup has had multiple inputs, like for example in a case like this:

we want:
SecondsPerPoint: 60
TTL: 30d

we have
0: 
SecondsPerPoint: 60
TTL: 7d
1:
SecondsPerPoint: 3600
TTL: 30d

^^ In such a case the importer would "mix" the two inputs into one

currentBoundary = boundary
agg.Add(inPoint.Value)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make sure to flush any unflushed points? can you add a comment as to why (not)

Copy link
Contributor Author

@replay replay Aug 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll mention that in a comment too:
Generally we only want to write aggregated points that have been "finished", as in we have a received a point with a ts which is >= the highest ts that would be factored into this aggregated point, in any other case the result would likely be wrong because we can't predict the future.

@replay replay force-pushed the importer_agg_conversion_plan branch from be33c98 to 3c4104e Compare August 28, 2017 12:56
@replay replay force-pushed the importer_agg_conversion_plan branch from 3c4104e to a7d7d37 Compare August 28, 2017 12:57
return out
}

func decResolution(points []whisper.Point, methods []string, inRes, outRes uint32) map[string][]whisper.Point {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you document these functions. it's easy to confuse "decrease resolution" to "decrease interval", whereas it's the opposite: we're increasing the interval here.

sidenote:
https://www.google.be/search?q=resolution&oq=resolution&aqs=chrome..69i57j0l5.1000j0j7&sourceid=chrome&ie=UTF-8 has 1 applicable definition for resolution:

the smallest interval measurable by a telescope or other scientific instrument; the resolving power.
the degree of detail visible in a photographic or television image.

this definition seems also inconsistent to me. as they use resolution for both "interval" and "degree of detail", but the lower the interval, the higher the degree of detail..

and we also speak of "high resolution screens" to talk about smaller intervals/pixels, so either way of phrasing seems correct, but just worth commenting what exactly the functions do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i think "decrease interval" is kind of ambiguous because strictly speaking it increases the resolution, as you say, but i think many people might misunderstand it as decreasing the resolution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since resolution is not well defined and ambiguous, i don't understand what you're saying.
but i have no trouble believing that people can misunderstand interval - even though it's more clearly defined than resolution, so that's why i'm saying to just document the functions better.

continue
}

rangeEnd := inPoint.Timestamp - (inPoint.Timestamp % outRes)
Copy link
Contributor

@Dieterbe Dieterbe Aug 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct me if i'm wrong:

  1. points in whisper are quantized (e.g. for a res of 30s, timestamps in whisper are always divisible by 30 without remainder)
  2. thus, all points divide by inRes without remainder
  3. outRes < inRes
  4. all points divide by outRes without remainder except when outRes doesn't fit evenly in inRes (e.g. outRes is 10 and inRes 15)
  5. rangeEnd always equals inPoint.Timestamp except when outRes doesn't fit evenly in inRes (e.g. outRes is 10 and inRes 15)

if this is correct, worth pointing out i think

Copy link
Contributor Author

@replay replay Aug 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if i'm misunderstand what you mean, but don't your point 4) and point 5) conflict with each other?
If inRes is 15 and outRes is 10 then some points will not divide by outRes without remainder (which is fine)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i updated point 4. had a mistake in there.
i don't have any particular point other than I think 5 is an interesting observation that is probably worth documenting (if i'm correct)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

your comment says "outRes is > inRes", that's wrong, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol right, total confusion

outData := incResolution(inData, method, inRes, outRes, rawRes)

if len(expectedResult) != len(outData) {
t.Fatalf("Generated data is not as expected:\n%+v\n%+v", outData, expectedResult)
Copy link
Contributor

@Dieterbe Dieterbe Aug 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when this dumps both structures, it's not clear which is the one that was expected, which is what we one we got.
maybe better something like

t.Fatalf("testIncResolution.\nExpected:\n%+v\nGot:\n%+v\n", outData, expectedResult)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for the other ones btw

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

{0, 0},
{10, 10},
{0, 0},
{0, 0},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what exactly is a gap here? as in null data? is this how whisper returns that? a 0 ts and 0 value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because whisper preallocates:

$> whisper-create.py whisperfile.wsp 1:6
$> whisper-update.py ./whisperfile.wsp $(date +%s):1
$> whisper-dump.py whisperfile.wsp 
Meta data:
  aggregation method: average
  max retention: 6
  xFilesFactor: 0.5

Archive 0 info:
  offset: 28
  seconds per point: 1
  points: 6
  retention: 6
  size: 72

Archive 0 data:
0: 1504002715,          1
1: 0,          0
2: 0,          0
3: 0,          0
4: 0,          0
5: 0,          0

testDecResolution(t, inData, expectedResult, []string{"avg"}, 10, 30)
}

func testDecResolution(t *testing.T, inData []whisper.Point, expectedResult map[string][]whisper.Point, methods []string, inRes, outRes uint32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this function placed right in the middle of all its callers, maybe move it to the top, right under testIncResolution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it right above its call sites

func TestRowKeyAgg1(t *testing.T) {
res := getRowKey(1, "aaa", "sum", 60)
if res != "aaa_sum_60" {
t.Fatalf("row key for aggregation 0 should equal the id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect. maybe use expected - got format here also.
in fact i would merge these 2 tests into one, and do it table based.

}

func TestEncodedChunksFromPointsWithoutUnfinished(t *testing.T) {
points := generatePoints(25200, 10, 10, 0, 8640, func(i float64) float64 { return i + 1 })
Copy link
Contributor

@Dieterbe Dieterbe Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment wrt the (in)significance of these input numbers

for iter.Next() {
ts, val := iter.Values()
if points[i].Timestamp != ts || points[i].Value != val {
t.Fatalf("Unexpected value at index %d: %d:%f instead of %d:%f", i, ts, val, points[i].Timestamp, points[i].Value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use expected: %d:%f got: %d:%f syntax here also for consistency.

}
i++
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs a check to confirm that we actually iterated over all points (if iter.Next() didn't return points, we wouldn't notice now)

@replay replay force-pushed the importer_agg_conversion_plan branch from dd45d6f to 148aec8 Compare August 29, 2017 13:17
@replay
Copy link
Contributor Author

replay commented Aug 29, 2017

@Dieterbe I think every1 of your comments has been dealt with now

log(fmt.Sprintf("Processing file %q", file))
met, err := getMetric(w, file)
name := getMetricName(file)
log.Info(fmt.Sprintf("Processing file %s (%s)", file, name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use log.Infof (also elsewhere where you use log.Info(fmt.Sprintf()

rawRes := c.archives[0].SecondsPerPoint

adjustedPoints := make(map[string]map[uint32]float64)
if retIdx > 0 && c.method == "avg" || c.method == "sum" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should take the desired aggregations into account.
if whisper was sum, but we only desire sum, then why generate and store cnt?
in fact, this goes for everything here. for example, if whisper had max, but desired for MT is min, then why convert and store max into MT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far my approach was to import whatever data we can get out of the whisper files. If min is desired but the whisper file contains max then we simply do not have the data required to satisfy what's desired, but we do have min so let's import that.

Copy link
Contributor Author

@replay replay Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the whisper file contains a summed aggregation and we can get the cnt for free which then allows us to also display avg, then why not take it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps best in this case to issue warnings. after all, if whisper had a rollup that is now no longer enabled, then maybe we should warn the user that they should probably enable that rollup in MT as well. otherwise they might get surprised if at first things seem to work (e.g. data imported from whisper is readable) but then later they find out that HM/MT is not configured to store an aggregation they wanted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand you right that we should issue a warning if the method has been set to sum but we additionally generate the cnt? Ok, can do that.
Actually at the time of the import we don't know what's enabled/disabled in the metrictank config unless we ask the user for the storage-aggregations.conf too. I assumed that most average users just have all aggregation methods enabled, so I figured I'll just fill as many of them as I can based on the data from whisper.

Copy link
Contributor

@Dieterbe Dieterbe Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we would need to look at both files. I think for HM, where aggregation methods are supposed to carefully crafted to balance a great experience vs storage cost vs $$ cost, it would make sense to warn if there's data in whisper archive that is not set up for HM. at the very least, it's an incentive to make sure the storage-schemas for HM is as desired without forgetting a needed aggregation. maybe @woodsaj or @nopzor1200 should chip in here.

Copy link
Contributor Author

@replay replay Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's necessary. If a user has whisper files that contain max aggregations and then configures their MT to only keep min they'll see the imported data when they choose to consolidateBy(max) and the new data when they consolidateBy(min), which makes sense and is probably a very rare case anyway. Asking the user to also provide their storage-aggregations.conf just to be able to warn them if they potentially might have misconfigured something is at average probably a worse user experience and not a better one, because the vast majority is just going to have all aggregations on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asking the user to also provide their storage-aggregations.conf just to be able to warn them

we are the ones constructing this file, at least for the HM use case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so then we'll need to make the user download two files and point the tool at both of them etc. I'd prefer to keep the process as simple and smooth as possible, so it doesn't seem justified to have to deal with an extra file just to display a warning

@Dieterbe
Copy link
Contributor

i'm not entirely sold on the "just import everything even if it's not a configured aggregation" but we can always revise this later if needed.

@Dieterbe Dieterbe merged commit ee4db35 into master Aug 31, 2017
@Dieterbe Dieterbe deleted the importer_agg_conversion_plan branch September 18, 2018 09:00
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants