Skip to content

Commit

Permalink
test: fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Jul 10, 2024
1 parent ca2303b commit 51870f7
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/pattern/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (i *Ingester) Flush() {

func (i *Ingester) flush(mayRemoveStreams bool) {
i.sweepUsers(true, mayRemoveStreams)
i.downsampleMetrics(model.Now())

// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
Expand Down
129 changes: 124 additions & 5 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -39,6 +40,16 @@ func setup(t *testing.T) *instance {
return inst
}

func downsampleInstance(inst *instance, tts int64) {
ts := model.TimeFromUnixNano(time.Unix(tts, 0).UnixNano())
_ = inst.streams.ForEach(func(s *stream) (bool, error) {
inst.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}

func TestInstancePushQuery(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -55,6 +66,7 @@ func TestInstancePushQuery(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, 20)

err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -70,6 +82,7 @@ func TestInstancePushQuery(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, 30)

for i := 0; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -87,6 +100,7 @@ func TestInstancePushQuery(t *testing.T) {
})
require.NoError(t, err)
}
downsampleInstance(inst, 30)

it, err := inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{
Query: "{test=\"test\"}",
Expand Down Expand Up @@ -115,6 +129,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -130,8 +147,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`)
require.NoError(t, err)
Expand All @@ -149,10 +166,11 @@ func TestInstancePushQuerySamples(t *testing.T) {

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s)
// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints := ((20 * 30) / 10)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
require.Equal(t, float64(1), res.Series[0].Samples[expectedDataPoints-1].Value)

expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`)
require.NoError(t, err)
Expand All @@ -170,7 +188,7 @@ func TestInstancePushQuerySamples(t *testing.T) {

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s)
// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints = ((20 * 30) / 10)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))

Expand All @@ -187,6 +205,101 @@ func TestInstancePushQuerySamples(t *testing.T) {
require.Equal(t, float64(4), res.Series[0].Samples[expectedDataPoints-1].Value)
})

t.Run("test count_over_time samples with downsampling", func(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbls.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(0, 0),
Line: "ts=1 msg=hello",
},
},
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbls.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(int64(10*i), 0),
Line: "foo bar foo bar",
},
},
},
},
})
require.NoError(t, err)

// downsample every 20s
if i%2 == 0 {
downsampleInstance(inst, int64(10*i))
}
}

expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`)
require.NoError(t, err)

it, err := inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{
Query: expr.String(),
Start: time.Unix(0, 0),
End: time.Unix(int64(10*30), 0),
Step: 20000,
})
require.NoError(t, err)
res, err := iter.ReadAllSamples(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints := ((10 * 30) / 20)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)

// after the first push there's 2 pushes per sample due to downsampling
require.Equal(t, float64(2), res.Series[0].Samples[expectedDataPoints-1].Value)

expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`)
require.NoError(t, err)

it, err = inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{
Query: expr.String(),
Start: time.Unix(0, 0),
End: time.Unix(int64(10*30), 0),
Step: 20000,
})
require.NoError(t, err)
res, err = iter.ReadAllSamples(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))

require.Equal(t, lbls.String(), res.Series[0].GetLabels())

// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints = ((10 * 30) / 20)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))

// with a larger selection range of 80s, we expect to eventually get up to 8 per datapoint
// our pushes are spaced 10s apart, downsampled every 20s, and there's 10s step,
// so we expect to see the value increase by 2 every samples, maxing out and staying at 8 after 5 samples
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
require.Equal(t, float64(3), res.Series[0].Samples[1].Value)
require.Equal(t, float64(5), res.Series[0].Samples[2].Value)
require.Equal(t, float64(7), res.Series[0].Samples[3].Value)
require.Equal(t, float64(8), res.Series[0].Samples[4].Value)
require.Equal(t, float64(8), res.Series[0].Samples[expectedDataPoints-1].Value)
})

t.Run("test bytes_over_time samples", func(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Expand All @@ -202,6 +315,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)

downsampleInstance(inst, 0)
for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand All @@ -217,8 +333,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

expr, err := syntax.ParseSampleExpr(`bytes_over_time({test="test"}[20s])`)
require.NoError(t, err)
Expand Down Expand Up @@ -343,6 +459,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)

for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
Expand Down Expand Up @@ -397,8 +516,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)

for _, tt := range []struct {
name string
Expand Down
12 changes: 12 additions & 0 deletions pkg/pattern/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ func TestInstance_QuerySample(t *testing.T) {
return instance
}

downsampleInstance := func(inst *instance, ts model.Time) {
_ = inst.streams.ForEach(func(s *stream) (bool, error) {
inst.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}

ctx := context.Background()

thirtySeconds := int64(30000)
Expand Down Expand Up @@ -85,6 +94,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(lastTsMilli))

// 5 min query range
// 1 min step
Expand Down Expand Up @@ -203,6 +213,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(then+oneMin+thirtySeconds))

err = instance.Push(ctx, &logproto.PushRequest{
Streams: []push.Stream{
Expand Down Expand Up @@ -245,6 +256,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(then+oneMin+oneMin+oneMin+thirtySeconds))

// steps
start := then
Expand Down
23 changes: 19 additions & 4 deletions pkg/pattern/metric/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Chunks struct {
lock sync.RWMutex
logger log.Logger
metrics metrics
rawSamples Samples
rawSamples SamplesWithoutTS
service string
}

Expand All @@ -59,7 +59,7 @@ func NewChunks(labels labels.Labels, chunkMetrics *ChunkMetrics, logger log.Logg
chunks: []*Chunk{},
labels: labels,
logger: logger,
rawSamples: Samples{},
rawSamples: SamplesWithoutTS{},
service: service,

metrics: metrics{
Expand All @@ -73,7 +73,7 @@ func (c *Chunks) Observe(bytes, count float64, ts model.Time) {
c.lock.Lock()
defer c.lock.Unlock()

c.rawSamples = append(c.rawSamples, newSample(bytes, count, ts))
c.rawSamples = append(c.rawSamples, newSampleWithoutTS(bytes, count))
c.metrics.samples.Inc()
}

Expand Down Expand Up @@ -193,6 +193,11 @@ type Sample struct {
Count float64
}

type SampleWithoutTS struct {
Bytes float64
Count float64
}

func newSample(bytes, count float64, ts model.Time) Sample {
return Sample{
Timestamp: ts,
Expand All @@ -201,7 +206,17 @@ func newSample(bytes, count float64, ts model.Time) Sample {
}
}

type Samples []Sample
func newSampleWithoutTS(bytes, count float64) SampleWithoutTS {
return SampleWithoutTS{
Bytes: bytes,
Count: count,
}
}

type (
Samples []Sample
SamplesWithoutTS []SampleWithoutTS
)

type Chunk struct {
Samples Samples
Expand Down
4 changes: 4 additions & 0 deletions pkg/pattern/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func TestSampleIterator(t *testing.T) {
Line: "ts=2 msg=hello",
},
})
stream.Downsample(model.TimeFromUnix(20))

require.NoError(t, err)

Expand Down Expand Up @@ -185,6 +186,7 @@ func TestSampleIterator(t *testing.T) {
},
})
require.NoError(t, err)
stream.Downsample(model.TimeFromUnix(20))

err = stream.Push(context.Background(), []push.Entry{
{
Expand All @@ -197,6 +199,7 @@ func TestSampleIterator(t *testing.T) {
},
})
require.NoError(t, err)
stream.Downsample(model.TimeFromUnix(40))

t.Run("non-overlapping timestamps", func(t *testing.T) {
expr, err := syntax.ParseSampleExpr("count_over_time({foo=\"bar\"}[5s])")
Expand Down Expand Up @@ -273,6 +276,7 @@ func TestSampleIterator(t *testing.T) {
Line: "ts=2 msg=hello",
},
})
stream.Downsample(model.TimeFromUnixNano(time.Unix(26, 999).UnixNano()))

require.NoError(t, err)

Expand Down

0 comments on commit 51870f7

Please sign in to comment.