Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: downsample aggregated metrics #13449

Merged
merged 5 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,10 @@ pattern_ingester:
# CLI flag: -pattern-ingester.metric-aggregation.log-push-observations
[log_push_observations: <boolean> | default = false]

# How often to downsample metrics from raw push observations.
# CLI flag: -pattern-ingester.downsample-period
[downsample_period: <duration> | default = 10s]

# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.
Expand Down
18 changes: 18 additions & 0 deletions pkg/pattern/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (i *Ingester) Flush() {

func (i *Ingester) flush(mayRemoveStreams bool) {
i.sweepUsers(true, mayRemoveStreams)
i.downsampleMetrics(model.Now())

// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
Expand Down Expand Up @@ -73,3 +74,20 @@ func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) {
return true, nil
})
}

func (i *Ingester) downsampleMetrics(ts model.Time) {
instances := i.getInstances()

for _, instance := range instances {
i.downsampleInstance(instance, ts)
}
}

func (i *Ingester) downsampleInstance(instance *instance, ts model.Time) {
_ = instance.streams.ForEach(func(s *stream) (bool, error) {
instance.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}
33 changes: 27 additions & 6 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"google.golang.org/grpc/health/grpc_health_v1"

ring_client "github.com/grafana/dskit/ring/client"
Expand Down Expand Up @@ -206,13 +207,33 @@ func (i *Ingester) loop() {
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()

for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
if i.cfg.MetricAggregation.Enabled {
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()

case <-i.loopQuit:
return
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)

case t := <-downsampleTicker.C:
downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod)
now := model.TimeFromUnixNano(t.UnixNano())
i.downsampleMetrics(now)

case <-i.loopQuit:
return
}
}
} else {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)

case <-i.loopQuit:
return
}
}
}
}
Expand Down
129 changes: 124 additions & 5 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -39,6 +40,16 @@ func setup(t *testing.T) *instance {
return inst
}

func downsampleInstance(inst *instance, tts int64) {
ts := model.TimeFromUnixNano(time.Unix(tts, 0).UnixNano())
_ = inst.streams.ForEach(func(s *stream) (bool, error) {
inst.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}

func TestInstancePushQuery(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -55,6 +66,7 @@ func TestInstancePushQuery(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, 20)

err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -70,6 +82,7 @@ func TestInstancePushQuery(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, 30)

for i := 0; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -87,6 +100,7 @@ func TestInstancePushQuery(t *testing.T) {
})
require.NoError(t, err)
}
downsampleInstance(inst, 30)

it, err := inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{
Query: "{test=\"test\"}",
Expand Down Expand Up @@ -115,6 +129,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -130,8 +147,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`)
require.NoError(t, err)
Expand All @@ -149,10 +166,11 @@ func TestInstancePushQuerySamples(t *testing.T) {

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s)
// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints := ((20 * 30) / 10)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
require.Equal(t, float64(1), res.Series[0].Samples[expectedDataPoints-1].Value)

expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`)
require.NoError(t, err)
Expand All @@ -170,7 +188,7 @@ func TestInstancePushQuerySamples(t *testing.T) {

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s)
// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints = ((20 * 30) / 10)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))

Expand All @@ -187,6 +205,101 @@ func TestInstancePushQuerySamples(t *testing.T) {
require.Equal(t, float64(4), res.Series[0].Samples[expectedDataPoints-1].Value)
})

t.Run("test count_over_time samples with downsampling", func(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbls.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(0, 0),
Line: "ts=1 msg=hello",
},
},
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbls.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(int64(10*i), 0),
Line: "foo bar foo bar",
},
},
},
},
})
require.NoError(t, err)

// downsample every 20s
if i%2 == 0 {
downsampleInstance(inst, int64(10*i))
}
}

expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`)
require.NoError(t, err)

it, err := inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{
Query: expr.String(),
Start: time.Unix(0, 0),
End: time.Unix(int64(10*30), 0),
Step: 20000,
})
require.NoError(t, err)
res, err := iter.ReadAllSamples(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints := ((10 * 30) / 20)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)

// after the first push there's 2 pushes per sample due to downsampling
require.Equal(t, float64(2), res.Series[0].Samples[expectedDataPoints-1].Value)

expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`)
require.NoError(t, err)

it, err = inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{
Query: expr.String(),
Start: time.Unix(0, 0),
End: time.Unix(int64(10*30), 0),
Step: 20000,
})
require.NoError(t, err)
res, err = iter.ReadAllSamples(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints = ((10 * 30) / 20)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))

// with a larger selection range of 80s, we expect to eventually get up to 8 per datapoint
// our pushes are spaced 10s apart, downsampled every 20s, and there's 10s step,
// so we expect to see the value increase by 2 every samples, maxing out and staying at 8 after 5 samples
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
require.Equal(t, float64(3), res.Series[0].Samples[1].Value)
require.Equal(t, float64(5), res.Series[0].Samples[2].Value)
require.Equal(t, float64(7), res.Series[0].Samples[3].Value)
require.Equal(t, float64(8), res.Series[0].Samples[4].Value)
require.Equal(t, float64(8), res.Series[0].Samples[expectedDataPoints-1].Value)
})

t.Run("test bytes_over_time samples", func(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -202,6 +315,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)

downsampleInstance(inst, 0)
for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -217,8 +333,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

expr, err := syntax.ParseSampleExpr(`bytes_over_time({test="test"}[20s])`)
require.NoError(t, err)
Expand Down Expand Up @@ -343,6 +459,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand Down Expand Up @@ -397,8 +516,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

for _, tt := range []struct {
name string
Expand Down
12 changes: 12 additions & 0 deletions pkg/pattern/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ func TestInstance_QuerySample(t *testing.T) {
return instance
}

downsampleInstance := func(inst *instance, ts model.Time) {
_ = inst.streams.ForEach(func(s *stream) (bool, error) {
inst.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}

ctx := context.Background()

thirtySeconds := int64(30000)
Expand Down Expand Up @@ -85,6 +94,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(lastTsMilli))

// 5 min query range
// 1 min step
Expand Down Expand Up @@ -203,6 +213,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(then+oneMin+thirtySeconds))

err = instance.Push(ctx, &logproto.PushRequest{
Streams: []push.Stream{
Expand Down Expand Up @@ -245,6 +256,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(then+oneMin+oneMin+oneMin+thirtySeconds))

// steps
start := then
Expand Down
Loading
Loading