Skip to content

Commit

Permalink
Count unflushed chunks in instance stats (#9479)
Browse files Browse the repository at this point in the history
The instance wasn't counting unflushed chunks in the stats. This change
checks that the chunk's `flushed time` is 0 which means it hasn't been
flushed yet.
  • Loading branch information
MasslessParticle committed May 18, 2023
1 parent 64c6f6b commit aeba51a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
5 changes: 1 addition & 4 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,6 @@ func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest
from, through := req.From.Time(), req.Through.Time()

if err = i.forMatchingStreams(ctx, from, matchers, nil, func(s *stream) error {
// checks for equality against chunk flush fields
var zeroValueTime time.Time

// Consider streams which overlap our time range
if shouldConsiderStream(s, from, through) {
s.chunkMtx.RLock()
Expand All @@ -583,7 +580,7 @@ func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest
// by the TSDB manager+shipper
chkFrom, chkThrough := chk.chunk.Bounds()

if !chk.flushed.Equal(zeroValueTime) && from.Before(chkThrough) && through.After(chkFrom) {
if chk.flushed.IsZero() && from.Before(chkThrough) && through.After(chkFrom) {
hasChunkOverlap = true
res.Chunks++
factor := util.GetFactorOfTime(from.UnixNano(), through.UnixNano(), chkFrom.UnixNano(), chkThrough.UnixNano())
Expand Down
17 changes: 17 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,23 @@ func TestStreamShardingUsage(t *testing.T) {
})
}

func TestGetStats(t *testing.T) {
instance := defaultInstance(t)
resp, err := instance.GetStats(context.Background(), &logproto.IndexStatsRequest{
From: 0,
Through: 11000,
Matchers: `{host="agent"}`,
})
require.NoError(t, err)

require.Equal(t, &logproto.IndexStatsResponse{
Streams: 2,
Chunks: 2,
Bytes: 160,
Entries: 10,
}, resp)
}

func defaultInstance(t *testing.T) *instance {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()
Expand Down

0 comments on commit aeba51a

Please sign in to comment.