Skip to content

Commit

Permalink
chore(stats): add pipeline_wrapper_filtered_lines (#12399)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli authored Mar 29, 2024
1 parent cf71ac7 commit c8c8477
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 106 deletions.
1 change: 1 addition & 0 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func RecordRangeAndInstantQueryMetrics(
"splits", stats.Summary.Splits,
"shards", stats.Summary.Shards,
"query_referenced_structured_metadata", stats.QueryReferencedStructuredMetadata(),
"pipeline_wrapper_filtered_lines", stats.PipelineWrapperFilteredLines(),
"chunk_refs_fetch_time", stats.ChunkRefsFetchTime(),
"cache_chunk_req", stats.Caches.Chunk.EntriesRequested,
"cache_chunk_hit", stats.Caches.Chunk.EntriesFound,
Expand Down
9 changes: 9 additions & 0 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func (s *Store) Merge(m Store) {
s.TotalChunksRef += m.TotalChunksRef
s.TotalChunksDownloaded += m.TotalChunksDownloaded
s.CongestionControlLatency += m.CongestionControlLatency
s.PipelineWrapperFilteredLines += m.PipelineWrapperFilteredLines
s.ChunksDownloadTime += m.ChunksDownloadTime
s.ChunkRefsFetchTime += m.ChunkRefsFetchTime
s.Chunk.HeadChunkBytes += m.Chunk.HeadChunkBytes
Expand Down Expand Up @@ -312,6 +313,10 @@ func (r Result) CongestionControlLatency() time.Duration {
return time.Duration(r.Querier.Store.CongestionControlLatency)
}

func (r Result) PipelineWrapperFilteredLines() int64 {
return r.Querier.Store.PipelineWrapperFilteredLines + r.Ingester.Store.PipelineWrapperFilteredLines
}

func (r Result) TotalDuplicates() int64 {
return r.Querier.Store.Chunk.TotalDuplicates + r.Ingester.Store.Chunk.TotalDuplicates
}
Expand Down Expand Up @@ -397,6 +402,10 @@ func (c *Context) AddCongestionControlLatency(i time.Duration) {
atomic.AddInt64(&c.store.CongestionControlLatency, int64(i))
}

func (c *Context) AddPipelineWrapperFilterdLines(i int64) {
atomic.AddInt64(&c.store.PipelineWrapperFilteredLines, i)
}

func (c *Context) AddChunksDownloaded(i int64) {
atomic.AddInt64(&c.store.TotalChunksDownloaded, i)
}
Expand Down
36 changes: 23 additions & 13 deletions pkg/logqlmodel/stats/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestResult(t *testing.T) {
stats.AddCacheRequest(IndexCache, 4)
stats.AddCacheRequest(ResultCache, 1)
stats.SetQueryReferencedStructuredMetadata()
stats.AddPipelineWrapperFilterdLines(1)

fakeIngesterQuery(ctx)
fakeIngesterQuery(ctx)
Expand All @@ -39,6 +40,7 @@ func TestResult(t *testing.T) {
TotalLinesSent: 60,
TotalReached: 2,
Store: Store{
PipelineWrapperFilteredLines: 2,
Chunk: Chunk{
HeadChunkBytes: 10,
HeadChunkLines: 20,
Expand All @@ -51,10 +53,11 @@ func TestResult(t *testing.T) {
},
Querier: Querier{
Store: Store{
TotalChunksRef: 50,
TotalChunksDownloaded: 60,
ChunksDownloadTime: time.Second.Nanoseconds(),
QueryReferencedStructured: true,
TotalChunksRef: 50,
TotalChunksDownloaded: 60,
ChunksDownloadTime: time.Second.Nanoseconds(),
QueryReferencedStructured: true,
PipelineWrapperFilteredLines: 1,
Chunk: Chunk{
HeadChunkBytes: 10,
HeadChunkLines: 20,
Expand Down Expand Up @@ -148,6 +151,7 @@ func fakeIngesterQuery(ctx context.Context) {
TotalBatches: 25,
TotalLinesSent: 30,
Store: Store{
PipelineWrapperFilteredLines: 1,
Chunk: Chunk{
HeadChunkBytes: 5,
HeadChunkLines: 10,
Expand All @@ -173,6 +177,7 @@ func TestResult_Merge(t *testing.T) {
TotalLinesSent: 60,
TotalReached: 2,
Store: Store{
PipelineWrapperFilteredLines: 4,
Chunk: Chunk{
HeadChunkBytes: 10,
HeadChunkLines: 20,
Expand All @@ -185,10 +190,11 @@ func TestResult_Merge(t *testing.T) {
},
Querier: Querier{
Store: Store{
TotalChunksRef: 50,
TotalChunksDownloaded: 60,
ChunksDownloadTime: time.Second.Nanoseconds(),
QueryReferencedStructured: true,
TotalChunksRef: 50,
TotalChunksDownloaded: 60,
ChunksDownloadTime: time.Second.Nanoseconds(),
QueryReferencedStructured: true,
PipelineWrapperFilteredLines: 2,
Chunk: Chunk{
HeadChunkBytes: 10,
HeadChunkLines: 20,
Expand Down Expand Up @@ -235,6 +241,7 @@ func TestResult_Merge(t *testing.T) {
TotalBatches: 2 * 50,
TotalLinesSent: 2 * 60,
Store: Store{
PipelineWrapperFilteredLines: 8,
Chunk: Chunk{
HeadChunkBytes: 2 * 10,
HeadChunkLines: 2 * 20,
Expand All @@ -248,10 +255,11 @@ func TestResult_Merge(t *testing.T) {
},
Querier: Querier{
Store: Store{
TotalChunksRef: 2 * 50,
TotalChunksDownloaded: 2 * 60,
ChunksDownloadTime: 2 * time.Second.Nanoseconds(),
QueryReferencedStructured: true,
TotalChunksRef: 2 * 50,
TotalChunksDownloaded: 2 * 60,
ChunksDownloadTime: 2 * time.Second.Nanoseconds(),
QueryReferencedStructured: true,
PipelineWrapperFilteredLines: 4,
Chunk: Chunk{
HeadChunkBytes: 2 * 10,
HeadChunkLines: 2 * 20,
Expand Down Expand Up @@ -306,13 +314,15 @@ func TestIngester(t *testing.T) {
statsCtx.AddDuplicates(10)
statsCtx.AddHeadChunkBytes(200)
statsCtx.SetQueryReferencedStructuredMetadata()
statsCtx.AddPipelineWrapperFilterdLines(1)
require.Equal(t, Ingester{
TotalReached: 1,
TotalChunksMatched: 100,
TotalBatches: 25,
TotalLinesSent: 30,
Store: Store{
QueryReferencedStructured: true,
QueryReferencedStructured: true,
PipelineWrapperFilteredLines: 1,
Chunk: Chunk{
HeadChunkBytes: 200,
CompressedBytes: 100,
Expand Down
Loading

0 comments on commit c8c8477

Please sign in to comment.