Skip to content

Commit

Permalink
feat: Add step param to Patterns Query API
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive committed Apr 19, 2024
1 parent 8c18463 commit eba1751
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 85 deletions.
2 changes: 0 additions & 2 deletions pkg/logproto/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,6 @@ func (m *ShardsRequest) LogToSpan(sp opentracing.Span) {

func (m *QueryPatternsRequest) GetCachingOptions() (res definitions.CachingOptions) { return }

func (m *QueryPatternsRequest) GetStep() int64 { return 0 }

func (m *QueryPatternsRequest) WithStartEnd(start, end time.Time) definitions.Request {
clone := *m
clone.Start = start
Expand Down
105 changes: 73 additions & 32 deletions pkg/logproto/pattern.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/logproto/pattern.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ message QueryPatternsRequest {
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
int64 step = 4;
}

message QueryPatternsResponse {
Expand Down
39 changes: 31 additions & 8 deletions pkg/pattern/drain/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func (c Chunk) spaceFor(ts model.Time) bool {
}

// ForRange returns samples with only the values
// in the given range [start:end).
// start and end are in milliseconds since epoch.
func (c Chunk) ForRange(start, end model.Time) []logproto.PatternSample {
// in the given range [start:end) and aggregates them by step duration.
// start and end are in milliseconds since epoch. step is a duration in milliseconds.
func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample {
if len(c.Samples) == 0 {
return nil
}
Expand All @@ -66,11 +66,34 @@ func (c Chunk) ForRange(start, end model.Time) []logproto.PatternSample {
return c.Samples[i].Timestamp >= end
})
}
return c.Samples[lo:hi]

// Re-scale samples into step-sized buckets
currentStep := truncateTimestamp(c.Samples[lo].Timestamp, step)
outputSamples := []logproto.PatternSample{
{
Timestamp: currentStep,
Value: 0,
},
}
for _, sample := range c.Samples[lo:hi] {
if sample.Timestamp >= currentStep+step {
stepForSample := truncateTimestamp(sample.Timestamp, step)
for i := currentStep + step; i <= stepForSample; i += step {
outputSamples = append(outputSamples, logproto.PatternSample{
Timestamp: i,
Value: 0,
})
}
currentStep = stepForSample
}
outputSamples[len(outputSamples)-1].Value += sample.Value
}

return outputSamples
}

func (c *Chunks) Add(ts model.Time) {
t := truncateTimestamp(ts)
t := truncateTimestamp(ts, timeResolution)

if len(*c) == 0 {
*c = append(*c, newChunk(t))
Expand All @@ -91,10 +114,10 @@ func (c *Chunks) Add(ts model.Time) {
})
}

func (c Chunks) Iterator(pattern string, from, through model.Time) iter.Iterator {
func (c Chunks) Iterator(pattern string, from, through, step model.Time) iter.Iterator {
iters := make([]iter.Iterator, 0, len(c))
for _, chunk := range c {
samples := chunk.ForRange(from, through)
samples := chunk.ForRange(from, through, step)
if len(samples) == 0 {
continue
}
Expand Down Expand Up @@ -173,4 +196,4 @@ func (c *Chunks) size() int {
return size
}

func truncateTimestamp(ts model.Time) model.Time { return ts - ts%timeResolution }
func truncateTimestamp(ts, step model.Time) model.Time { return ts - ts%step }
Loading

0 comments on commit eba1751

Please sign in to comment.