Skip to content

Commit

Permalink
Use previous value for cumulative resets (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd authored Mar 5, 2021
1 parent daaeaef commit cc72749
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 7 deletions.
24 changes: 17 additions & 7 deletions retrieval/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,21 @@ type seriesCacheEntry struct {
suffix string
hash uint64

hasReset bool
// Whether the series has been reset/initialized yet. This is false only for
// the first sample of a new series in the cache, which causes the initial
// "reset". After that, it is always true.
hasReset bool

// The value and timestamp of the latest reset. The timestamp is when it
// occurred, and the value is what it was reset to. resetValue will initially
// be the value of the first sample, and then 0 for every subsequent reset.
resetValue float64
resetTimestamp int64

// Value of the most recent point seen for the time series. If a new value is
// less than the previous, then the series has reset.
previousValue float64

// maxSegment indicates the maximum WAL segment index in which
// the series was first logged.
// By providing it as an upper bound, we can safely delete a series entry
Expand Down Expand Up @@ -281,24 +293,22 @@ func (c *seriesCache) getResetAdjusted(ref uint64, t int64, v float64) (int64, f
if !hasReset {
e.resetTimestamp = t
e.resetValue = v
e.previousValue = v
// If we just initialized the reset timestamp, this sample should be skipped.
// We don't know the window over which the current cumulative value was built up over.
// The next sample for will be considered from this point onwards.
return 0, 0, false
}
if v < e.resetValue {
if v < e.previousValue {
// If the value has dropped, there's been a reset.
// If the series was reset, set the reset timestamp to be one millisecond
// before the timestamp of the current sample.
// We don't know the true reset time but this ensures the range is non-zero
// while unlikely to conflict with any previous sample.
e.resetValue = 0
e.resetTimestamp = t - 1
} else if e.resetTimestamp >= t {
// TODO: This case is problematic and typically
// results in some kind of data validation error. An
// out-of-order WAL entry?
// https://github.com/lightstep/opentelemetry-prometheus-sidecar/issues/84
}
e.previousValue = v
return e.resetTimestamp, v - e.resetValue, true
}

Expand Down
65 changes: 65 additions & 0 deletions retrieval/series_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,3 +416,68 @@ func TestSeriesCache_RenameMetric(t *testing.T) {
t.Fatalf("want proto metric name %q but got %q", want, entry.desc.Name)
}
}

func TestSeriesCache_ResetBehavior(t *testing.T) {
// Test the fix in
// https://github.com/Stackdriver/stackdriver-prometheus-sidecar/pull/263
logBuffer := &bytes.Buffer{}
defer func() {
if logBuffer.Len() > 0 {
t.Log(logBuffer.String())
}
}()
logger := log.NewLogfmtLogger(logBuffer)
extraLabels := labels.FromStrings("__resource_a", "resource2_a")
metadataMap := metadataMap{
"job1/inst1/metric1": &metadata.Entry{Metric: "metric1", MetricType: textparse.MetricTypeGauge, ValueType: metadata.DOUBLE},
}
c := newSeriesCache(logger, "", nil, nil, metadataMap, "", extraLabels)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const refID = 1

if err := c.set(ctx, refID, labels.FromStrings("__name__", "metric1", "job", "job1", "instance", "inst1"), 5); err != nil {
t.Fatalf("unexpected error: %s", err)
}

_, ok, err := c.get(ctx, refID)
require.NoError(t, err)
require.True(t, ok)

type kase struct {
ts int64
value float64
reset int64
cumulative float64
ok bool
}

const pad = 1

// Simulate two resets.
for i, k := range []kase{
{1, 10, 1, 0, false},
{2, 20, 1, 10, true},
{3, 30, 1, 20, true},
{4, 40, 1, 30, true},

{5, 5, 5 - pad, 5, true},
{6, 10, 5 - pad, 10, true},
{7, 15, 5 - pad, 15, true},

{8, 0, 8 - pad, 0, true},
{9, 10, 8 - pad, 10, true},
} {
ts, val, ok := c.getResetAdjusted(refID, k.ts, k.value)

require.Equal(t, k.ok, ok, "%d", i)

if !ok {
continue
}
require.Equal(t, k.reset, ts, "%d", i)
require.Equal(t, k.cumulative, val, "%d", i)
}
}

0 comments on commit cc72749

Please sign in to comment.