Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

update idx handling #574

Merged
merged 5 commits into from
Mar 22, 2017
Merged

update idx handling #574

merged 5 commits into from
Mar 22, 2017

Conversation

woodsaj
Copy link
Member

@woodsaj woodsaj commented Mar 21, 2017

  • support LastSave property, so we only periodically save to cassandra.
  • make adding to the writeQueue a non-blocking operation, unless the
    def has not been updated for 1.5x the updateInterval.

replaces PR #569 and PR #571

@woodsaj
Copy link
Member Author

woodsaj commented Mar 21, 2017

Insert benchmarks

new code

$ go test -v -run NONE -bench . -benchtime 1s
BenchmarkIndexing-4       30000	     51445 ns/op	    3518 B/op	      62 allocs/op
BenchmarkLoad-4            200000	      9355 ns/op	    1745 B/op	      14 allocs/op

current master

(current benchmarks in master dont work, as they dont write to cassandra. So these results are from local modifications to set updateCassIdx=true)

$ go test -v -run NONE -bench . -benchtime 1s
BenchmarkIndexing-4   	   30000	     49439 ns/op	    3518 B/op	      61 allocs/op
BenchmarkLoad-4       	  200000	      9384 ns/op	    1744 B/op	      14 allocs/op

Results are pretty much identical because all writes are blocking writes to cassandra.

updates

new code

$ go test -v -run NONE -bench BenchmarkIndexingWithUpdates
BenchmarkIndexingWithUpdates-4   	  500000	      2376 ns/op	     760 B/op	      14 allocs/op

current master

(master does not currently have the BenchmarkIndexingWithUpdates benchmark, so these results are from local edits to add it.)

$ go test -v -run NONE -bench BenchmarkIndexingWithUpdates
BenchmarkIndexingWithUpdates-4   	   30000	     48469 ns/op	    2693 B/op	      51 allocs/op

When performing updates that dont require a save to cassandra, the new code is way faster.

- support LastSave property, so we only periodically save to cassandra.
- make adding to the writeQueue a non-blocking operation, unless the
  def has not been updated for 1.5x the updateInterval.
log.Debug("cassandra-idx updating def in index.")
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
archive.LastSave = now
c.MemoryIdx.Update(archive)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe collect some statistics that track how many % of the updates happen due to the if condition being met and how many happen via the non-blocking writes? That might be a good indicator to look at if we seem to hit a writes-per-time limitation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good idea.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added a counter idx.cassandra.save.skipped to keep track of how many saves are being skipped due to the writeQ being full. Spikes in this counter would be normal, but continued growth over an extended time would indicate a performance problem.

@@ -251,10 +251,8 @@ max-stale = 0
prune-interval = 3h
# synchronize index changes to cassandra. not all your nodes need to do this.
update-cassandra-index = true
#frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates
#frequency at which we should update flush changes to cassandra. only relevent if update-cassandra-index is true.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relevant. (sorry)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@@ -132,6 +142,9 @@ func (m *MemoryIdx) Load(defs []schema.MetricDefinition) int {
continue
}
m.add(def)
// as we are loading the metricDefs from a persistant store, set the lastSave
// to the lastUpdate timestamp.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment says exactly what the code does but it should say why it does it, and should explain why this is OK

m.Unlock()
return
}
*(m.DefById[entry.Id]) = entry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't we just write m.DefById[entry.Id] = &entry here? why not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safer. Another goroutine could already have a copy of the reference. If we just changed the address of what m.DefById[entry.Id] pointed to rather then content, then any modifications made by those other goroutines would be lost.

archive.MetricDefinition.Partition = partition
}

// if the entry has not been saved for 1.5x updateInterval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we use 1.5x the interval? wouldn't it make more sense (and be easier to reason about) to start doing blocking writes at exactly the 1 updateInterval mark?

BTW the compiler should optimize uint divisions by factors of two, i don't think we need to do it manually?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Writes aren't tried until exactly the updateInterval or greater has passed. If you forced a bocking write at exactly the 1 updateInterval then you would only ever try the non-blocking once, forcing all saves to be completed within 2x your metric interval. That is way to aggressive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as for why 1.5X, because that is what we have been using in hosted-metrics. ie updateFuzziness of 0.5 leading to updates to happen between updateInterval and updateInterval x 1.5

// This is just a safety precaution to prevent corrupt index entries.
// This ensures that the index entry always contains the correct metricDefinition data.
if inMemory {
archive.MetricDefinition = *schema.MetricDefinitionFromMetricData(data)
Copy link
Contributor

@Dieterbe Dieterbe Mar 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that the id is generated from (almost) all the properties. properties not included in the name are Partition, Lastupdate (and Name but Name should always be same as Metric so it's not relevant here, that's still something we have to clean up at some point btw). MemoryIdx.AddOrUpdate already made sure to update Partition and LastUpdate, so i see no need for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha ok. so after this has run for a while, at some point we'll be able to take out these lines again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. Unless we introduce metadata fields in future that do not contribute to the generated id.

// This ensures that the index entry always contains the correct metricDefinition data.
if inMemory {
archive.MetricDefinition = *schema.MetricDefinitionFromMetricData(data)
archive.MetricDefinition.Partition = partition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemoryIdx.AddOrUpdate already made sure to update/set Partition field correctly?

@Dieterbe
Copy link
Contributor

replaces PR #569 and PR #571

571 also had a change to make the index Pruning use LastSave instead of LastUpdate. I think we should still apply that, as it seems to make more sense

@woodsaj
Copy link
Member Author

woodsaj commented Mar 21, 2017

LastSave cant be used for pruning, as not all nodes save metricDefs. So if updateCassIdx=false, then LastSave always be what was set when the def was loaded at startup.

@Dieterbe
Copy link
Contributor

Dieterbe commented Mar 21, 2017

for nodes that don't write to cassandra, we could just make it so that LastSave means "last save to the memory index" instead of "last save to the cassandra index". in other words, just have it be the timestamp of when it was last seen. that way we could do this. But anyway I don't feel strongly about it, I don't have a good example to make a strong case for this (other than people for some reason loading in old data, which doesn't seem very common), so we can keep as is.

@woodsaj
Copy link
Member Author

woodsaj commented Mar 21, 2017

for hosted-metrics the default MAX_STALE is 48hours. So users would have be streaming data with a delay of 48hours (as LastUpdate is now set for every point received instead of the previous periodic saving of lastUpdate). It is extremely unlikely anyone will send data with that much lag, and if they do we can just change MAX_STALE or turn pruning off completely.

// lastSave timestamp become more then 1.5 x UpdateInterval, in which case we will
// do a blocking write to the queue.
select {
case c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
Copy link
Contributor

@replay replay Mar 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't time.Unix(now, 0) be faster than time.Now()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

faster? yes. But as we use this for measuring how long items are spending in the queue time.Unix(now, 0) does not provide the required precision

OrgId: 1,
Time: 10,
}
data.SetId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that an iteration is in the order of nanoseconds, the overhead of the Itoa call , instantiating data, and calling SetId is probably starting to get significant. maybe use StopTimer and StartTimer to take this stuff out of the equation.

@@ -285,3 +387,46 @@ func BenchmarkLoad(b *testing.B) {
ix.Init()
ix.Stop()
}

func BenchmarkIndexingWithUpdates(b *testing.B) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the goal of this test - due to it calling insertDefs(ix, b.N) first - is to measure how long AddOrUpdate takes if and only if no update needs to happen. the name of this benchmark should reflect that. why is it called WithUpdates?

Copy link
Member Author

@woodsaj woodsaj Mar 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benchmarking when the metrics inserted are updates, not adds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right but it's not updating anything in cassandra. and since this test is called WithUpdates and it's in idx/cassandra/cassandra_test.go this leads to believe it benchmarks updates in cassandra. so maybe just call it BenchmarkIndexingUpdatesMemoryNotCassandra or something

Copy link
Member Author

@woodsaj woodsaj Mar 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

your suggestion makes less sense.
the benchmark calls CasIdx.AddOrUpdate() with items that only update. So "BENCHMARK INDEXING WITH metricData payloads that result in UPDATES" == BenchmarkIndexingWithUpdates

- create the metricData to be added outside of the main loop.

b.ReportAllocs()
b.ResetTimer()
updates := make([]*schema.MetricData, b.N)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by pre-allocating you could exhaust ram. i've ran into this a few times hence the suggestion to use StopTimer and StartTimer. but anyway i guess we'll see if it happens

@Dieterbe
Copy link
Contributor

btw to compare benchmarks, these tools are nice:
https://godoc.org/golang.org/x/tools/cmd/benchcmp
https://godoc.org/golang.org/x/perf/cmd/benchstat

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants