diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index 212abc38633ec..28f8fe4843556 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -448,8 +448,6 @@ func (m *ShardsRequest) LogToSpan(sp opentracing.Span) { func (m *QueryPatternsRequest) GetCachingOptions() (res definitions.CachingOptions) { return } -func (m *QueryPatternsRequest) GetStep() int64 { return 0 } - func (m *QueryPatternsRequest) WithStartEnd(start, end time.Time) definitions.Request { clone := *m clone.Start = start diff --git a/pkg/logproto/pattern.pb.go b/pkg/logproto/pattern.pb.go index b1b5755e9dfa0..a666a32850127 100644 --- a/pkg/logproto/pattern.pb.go +++ b/pkg/logproto/pattern.pb.go @@ -39,6 +39,7 @@ type QueryPatternsRequest struct { Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` Start time.Time `protobuf:"bytes,2,opt,name=start,proto3,stdtime" json:"start"` End time.Time `protobuf:"bytes,3,opt,name=end,proto3,stdtime" json:"end"` + Step int64 `protobuf:"varint,4,opt,name=step,proto3" json:"step,omitempty"` } func (m *QueryPatternsRequest) Reset() { *m = QueryPatternsRequest{} } @@ -94,6 +95,13 @@ func (m *QueryPatternsRequest) GetEnd() time.Time { return time.Time{} } +func (m *QueryPatternsRequest) GetStep() int64 { + if m != nil { + return m.Step + } + return 0 +} + type QueryPatternsResponse struct { Series []*PatternSeries `protobuf:"bytes,1,rep,name=series,proto3" json:"series,omitempty"` } @@ -242,37 +250,38 @@ func init() { func init() { proto.RegisterFile("pkg/logproto/pattern.proto", fileDescriptor_aaf4192acc66a4ea) } var fileDescriptor_aaf4192acc66a4ea = []byte{ - // 470 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x31, 0x6f, 0xd4, 0x30, - 0x14, 0x8e, 0x1b, 0xae, 0xd7, 0xba, 0x62, 0x31, 0x57, 0x88, 0x82, 0xe4, 0x9c, 0xb2, 0x70, 0x53, - 0x0c, 0x57, 0x09, 0x24, 0xc6, 0x9b, 0x18, 0x40, 0x2a, 0x81, 0x09, 0xc1, 0x90, 0x6b, 0x5d, 0xe7, - 0xd4, 0x38, 0x4e, 0x63, 0xbb, 0x12, 0x1b, 0x3f, 0xe1, 0x7e, 0x02, 0x23, 0x3f, 0xa5, 0xe3, 0x8d, - 0x15, 0x43, 0xe1, 0x72, 0x0b, 0x63, 0x7f, 0x02, 0x8a, 0xed, 0xf4, 0xae, 0x15, 0x1d, 0x58, 0x12, - 0xbf, 0xf7, 0x7d, 0xef, 0xf3, 0xf7, 0xde, 0x33, 0x0c, 0xab, 0x53, 0x46, 0x0a, 0xc1, 0xaa, 0x5a, - 0x28, 0x41, 0xaa, 0x4c, 0x29, 0x5a, 0x97, 0x89, 0x89, 0xd0, 0x4e, 0x97, 0x0f, 0x07, 0x4c, 0x30, - 0x61, 0x29, 0xed, 0xc9, 0xe2, 0x61, 0xc4, 0x84, 0x60, 0x05, 0x25, 0x26, 0x9a, 0xea, 0x13, 0xa2, - 0x66, 0x9c, 0x4a, 0x95, 0xf1, 0xca, 0x11, 0x9e, 0xde, 0x12, 0xef, 0x0e, 0x0e, 0x7c, 0xd4, 0x82, - 0x95, 0x96, 0xb9, 0xf9, 0xd8, 0x64, 0xfc, 0x1d, 0xc0, 0xc1, 0x7b, 0x4d, 0xeb, 0xaf, 0x87, 0xd6, - 0x89, 0x4c, 0xe9, 0x99, 0xa6, 0x52, 0xa1, 0x01, 0xec, 0x9d, 0xb5, 0xf9, 0x00, 0x0c, 0xc1, 0x68, - 0x37, 0xb5, 0x01, 0x7a, 0x0d, 0x7b, 0x52, 0x65, 0xb5, 0x0a, 0xb6, 0x86, 0x60, 0xb4, 0x37, 0x0e, - 0x13, 0xeb, 0x28, 0xe9, 0x1c, 0x25, 0x1f, 0x3b, 0x47, 0x93, 0x9d, 0x8b, 0xab, 0xc8, 0x9b, 0xff, - 0x8a, 0x40, 0x6a, 0x4b, 0xd0, 0x4b, 0xe8, 0xd3, 0xf2, 0x38, 0xf0, 0xff, 0xa3, 0xb2, 0x2d, 0x88, - 0xdf, 0xc0, 0xfd, 0x3b, 0x0e, 0x65, 0x25, 0x4a, 0x49, 0x11, 0x81, 0xdb, 0x92, 0xd6, 0x33, 0x2a, - 0x03, 0x30, 0xf4, 0x47, 0x7b, 0xe3, 0x27, 0xc9, 0x4d, 0xc7, 0x8e, 0xfb, 0xc1, 0xc0, 0xa9, 0xa3, - 0xc5, 0x9f, 0xe1, 0xc3, 0x5b, 0x00, 0x0a, 0x60, 0xdf, 0x6d, 0xc0, 0xb5, 0xd9, 0x85, 0xe8, 0x05, - 0xec, 0xcb, 0x8c, 0x57, 0x05, 0x95, 0xc1, 0xd6, 0x7d, 0xe2, 0x06, 0x4f, 0x3b, 0x5e, 0xac, 0xd6, - 0xea, 0x26, 0x83, 0xde, 0xc1, 0xdd, 0x9b, 0x05, 0x19, 0x7d, 0x7f, 0x42, 0xda, 0xd6, 0x7e, 0x5e, - 0x45, 0xcf, 0xd8, 0x4c, 0xe5, 0x7a, 0x9a, 0x1c, 0x09, 0xde, 0x6e, 0x93, 0x53, 0x95, 0x53, 0x2d, - 0xc9, 0x91, 0xe0, 0x5c, 0x94, 0x84, 0x8b, 0x63, 0x5a, 0x98, 0x81, 0xa4, 0x6b, 0x85, 0x76, 0x23, - 0xe7, 0x59, 0xa1, 0xa9, 0x99, 0xbd, 0x9f, 0xda, 0x60, 0x3c, 0x07, 0xb0, 0xef, 0xae, 0x45, 0xaf, - 0xe0, 0x83, 0x43, 0x2d, 0x73, 0xb4, 0xbf, 0xe1, 0x55, 0xcb, 0xdc, 0xad, 0x34, 0x7c, 0x7c, 0x37, - 0x6d, 0xe7, 0x18, 0x7b, 0xe8, 0x2d, 0xec, 0x99, 0x11, 0x23, 0xbc, 0xa6, 0xfc, 0xeb, 0x55, 0x84, - 0xd1, 0xbd, 0x78, 0xa7, 0xf5, 0x1c, 0x4c, 0xbe, 0x2c, 0x96, 0xd8, 0xbb, 0x5c, 0x62, 0xef, 0x7a, - 0x89, 0xc1, 0xb7, 0x06, 0x83, 0x1f, 0x0d, 0x06, 0x17, 0x0d, 0x06, 0x8b, 0x06, 0x83, 0xdf, 0x0d, - 0x06, 0x7f, 0x1a, 0xec, 0x5d, 0x37, 0x18, 0xcc, 0x57, 0xd8, 0x5b, 0xac, 0xb0, 0x77, 0xb9, 0xc2, - 0xde, 0xa7, 0xcd, 0x91, 0xb0, 0x3a, 0x3b, 0xc9, 0xca, 0x8c, 0x14, 0xe2, 0x74, 0x46, 0xce, 0x0f, - 0xc8, 0xe6, 0xb3, 0x9e, 0x6e, 0x9b, 0xdf, 0xc1, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xec, 0xd6, - 0xbc, 0xfc, 0x4a, 0x03, 0x00, 0x00, + // 483 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xb1, 0x6e, 0xd3, 0x40, + 0x18, 0xf6, 0xd5, 0x49, 0xd3, 0x5e, 0xc5, 0x72, 0xa4, 0x60, 0x19, 0xe9, 0x1c, 0x79, 0x21, 0x93, + 0x0f, 0x52, 0x09, 0x24, 0xc6, 0x4c, 0x0c, 0x20, 0x15, 0xc3, 0x84, 0x60, 0x70, 0xda, 0xbf, 0xb6, + 0x55, 0xdb, 0xe7, 0xfa, 0xee, 0x2a, 0xb1, 0xf1, 0x08, 0x79, 0x0c, 0x1e, 0x80, 0x87, 0xe8, 0x98, + 0xb1, 0x62, 0x28, 0xc4, 0x59, 0x18, 0xfb, 0x08, 0xc8, 0x77, 0x76, 0x93, 0x56, 0x74, 0xe8, 0x92, + 0xdc, 0xff, 0x7f, 0xdf, 0xff, 0xf9, 0xbb, 0xff, 0x3b, 0xec, 0x96, 0xa7, 0x31, 0xcb, 0x78, 0x5c, + 0x56, 0x5c, 0x72, 0x56, 0x46, 0x52, 0x42, 0x55, 0x04, 0xba, 0x22, 0x3b, 0x5d, 0xdf, 0x1d, 0xc6, + 0x3c, 0xe6, 0x86, 0xd2, 0x9c, 0x0c, 0xee, 0x7a, 0x31, 0xe7, 0x71, 0x06, 0x4c, 0x57, 0x33, 0x75, + 0xc2, 0x64, 0x9a, 0x83, 0x90, 0x51, 0x5e, 0xb6, 0x84, 0x67, 0xb7, 0xc4, 0xbb, 0x43, 0x0b, 0x3e, + 0x6e, 0xc0, 0x52, 0x89, 0x44, 0xff, 0x98, 0xa6, 0xff, 0x13, 0xe1, 0xe1, 0x07, 0x05, 0xd5, 0xb7, + 0x43, 0xe3, 0x44, 0x84, 0x70, 0xa6, 0x40, 0x48, 0x32, 0xc4, 0xfd, 0xb3, 0xa6, 0xef, 0xa0, 0x11, + 0x1a, 0xef, 0x86, 0xa6, 0x20, 0x6f, 0x70, 0x5f, 0xc8, 0xa8, 0x92, 0xce, 0xd6, 0x08, 0x8d, 0xf7, + 0x26, 0x6e, 0x60, 0x1c, 0x05, 0x9d, 0xa3, 0xe0, 0x53, 0xe7, 0x68, 0xba, 0x73, 0x71, 0xe5, 0x59, + 0xf3, 0xdf, 0x1e, 0x0a, 0xcd, 0x08, 0x79, 0x85, 0x6d, 0x28, 0x8e, 0x1d, 0xfb, 0x01, 0x93, 0xcd, + 0x00, 0x21, 0xb8, 0x27, 0x24, 0x94, 0x4e, 0x6f, 0x84, 0xc6, 0x76, 0xa8, 0xcf, 0xfe, 0x5b, 0xbc, + 0x7f, 0xc7, 0xb5, 0x28, 0x79, 0x21, 0x80, 0x30, 0xbc, 0x2d, 0xa0, 0x4a, 0x41, 0x38, 0x68, 0x64, + 0x8f, 0xf7, 0x26, 0x4f, 0x83, 0x9b, 0x2d, 0xb4, 0xdc, 0x8f, 0x1a, 0x0e, 0x5b, 0x9a, 0xff, 0x05, + 0x3f, 0xba, 0x05, 0x10, 0x07, 0x0f, 0xda, 0x54, 0xda, 0xab, 0x77, 0x25, 0x79, 0x89, 0x07, 0x22, + 0xca, 0xcb, 0x0c, 0x84, 0xb3, 0x75, 0x9f, 0xb8, 0xc6, 0xc3, 0x8e, 0xe7, 0xcb, 0xb5, 0xba, 0xee, + 0x90, 0xf7, 0x78, 0xf7, 0x26, 0x34, 0xad, 0x6f, 0x4f, 0x59, 0x73, 0xdd, 0x5f, 0x57, 0xde, 0xf3, + 0x38, 0x95, 0x89, 0x9a, 0x05, 0x47, 0x3c, 0x6f, 0x12, 0xce, 0x41, 0x26, 0xa0, 0x04, 0x3b, 0xe2, + 0x79, 0xce, 0x0b, 0x96, 0xf3, 0x63, 0xc8, 0xf4, 0x92, 0xc2, 0xb5, 0x42, 0x93, 0xd2, 0x79, 0x94, + 0x29, 0xd0, 0x79, 0xd8, 0xa1, 0x29, 0x26, 0x73, 0x84, 0x07, 0xed, 0x67, 0xc9, 0x6b, 0xdc, 0x3b, + 0x54, 0x22, 0x21, 0xfb, 0x1b, 0x5e, 0x95, 0x48, 0xda, 0x98, 0xdd, 0x27, 0x77, 0xdb, 0x66, 0x8f, + 0xbe, 0x45, 0xde, 0xe1, 0xbe, 0x5e, 0x31, 0xa1, 0x6b, 0xca, 0xff, 0x5e, 0x8a, 0xeb, 0xdd, 0x8b, + 0x77, 0x5a, 0x2f, 0xd0, 0xf4, 0xeb, 0x62, 0x49, 0xad, 0xcb, 0x25, 0xb5, 0xae, 0x97, 0x14, 0x7d, + 0xaf, 0x29, 0xfa, 0x51, 0x53, 0x74, 0x51, 0x53, 0xb4, 0xa8, 0x29, 0xfa, 0x53, 0x53, 0xf4, 0xb7, + 0xa6, 0xd6, 0x75, 0x4d, 0xd1, 0x7c, 0x45, 0xad, 0xc5, 0x8a, 0x5a, 0x97, 0x2b, 0x6a, 0x7d, 0xde, + 0x5c, 0x49, 0x5c, 0x45, 0x27, 0x51, 0x11, 0xb1, 0x8c, 0x9f, 0xa6, 0xec, 0xfc, 0x80, 0x6d, 0x3e, + 0xf5, 0xd9, 0xb6, 0xfe, 0x3b, 0xf8, 0x17, 0x00, 0x00, 0xff, 0xff, 0x3b, 0x4f, 0x5c, 0x50, 0x5e, + 0x03, 0x00, 0x00, } func (this *QueryPatternsRequest) Equal(that interface{}) bool { @@ -303,6 +312,9 @@ func (this *QueryPatternsRequest) Equal(that interface{}) bool { if !this.End.Equal(that1.End) { return false } + if this.Step != that1.Step { + return false + } return true } func (this *QueryPatternsResponse) Equal(that interface{}) bool { @@ -397,11 +409,12 @@ func (this *QueryPatternsRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&logproto.QueryPatternsRequest{") s = append(s, "Query: "+fmt.Sprintf("%#v", this.Query)+",\n") s = append(s, "Start: "+fmt.Sprintf("%#v", this.Start)+",\n") s = append(s, "End: "+fmt.Sprintf("%#v", this.End)+",\n") + s = append(s, "Step: "+fmt.Sprintf("%#v", this.Step)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -614,6 +627,11 @@ func (m *QueryPatternsRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Step != 0 { + i = encodeVarintPattern(dAtA, i, uint64(m.Step)) + i-- + dAtA[i] = 0x20 + } n1, err1 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.End, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.End):]) if err1 != nil { return 0, err1 @@ -779,6 +797,9 @@ func (m *QueryPatternsRequest) Size() (n int) { n += 1 + l + sovPattern(uint64(l)) l = github_com_gogo_protobuf_types.SizeOfStdTime(m.End) n += 1 + l + sovPattern(uint64(l)) + if m.Step != 0 { + n += 1 + sovPattern(uint64(m.Step)) + } return n } @@ -845,6 +866,7 @@ func (this *QueryPatternsRequest) String() string { `Query:` + fmt.Sprintf("%v", this.Query) + `,`, `Start:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Start), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `End:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.End), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `Step:` + fmt.Sprintf("%v", this.Step) + `,`, `}`, }, "") return s @@ -1026,6 +1048,25 @@ func (m *QueryPatternsRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Step", wireType) + } + m.Step = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPattern + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Step |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipPattern(dAtA[iNdEx:]) diff --git a/pkg/logproto/pattern.proto b/pkg/logproto/pattern.proto index 66a6c017b1926..e92a201b3a8b1 100644 --- a/pkg/logproto/pattern.proto +++ b/pkg/logproto/pattern.proto @@ -24,6 +24,7 @@ message QueryPatternsRequest { (gogoproto.stdtime) = true, (gogoproto.nullable) = false ]; + int64 step = 4; } message QueryPatternsResponse { diff --git a/pkg/pattern/drain/chunk.go b/pkg/pattern/drain/chunk.go index e438bb6b7c561..910000d2d5bdc 100644 --- a/pkg/pattern/drain/chunk.go +++ b/pkg/pattern/drain/chunk.go @@ -43,9 +43,9 @@ func (c Chunk) spaceFor(ts model.Time) bool { } // ForRange returns samples with only the values -// in the given range [start:end). -// start and end are in milliseconds since epoch. -func (c Chunk) ForRange(start, end model.Time) []logproto.PatternSample { +// in the given range [start:end) and aggregates them by step duration. +// start and end are in milliseconds since epoch. step is a duration in milliseconds. +func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample { if len(c.Samples) == 0 { return nil } @@ -66,11 +66,34 @@ func (c Chunk) ForRange(start, end model.Time) []logproto.PatternSample { return c.Samples[i].Timestamp >= end }) } - return c.Samples[lo:hi] + + // Re-scale samples into step-sized buckets + currentStep := truncateTimestamp(c.Samples[lo].Timestamp, step) + outputSamples := []logproto.PatternSample{ + { + Timestamp: currentStep, + Value: 0, + }, + } + for _, sample := range c.Samples[lo:hi] { + if sample.Timestamp >= currentStep+step { + stepForSample := truncateTimestamp(sample.Timestamp, step) + for i := currentStep + step; i <= stepForSample; i += step { + outputSamples = append(outputSamples, logproto.PatternSample{ + Timestamp: i, + Value: 0, + }) + } + currentStep = stepForSample + } + outputSamples[len(outputSamples)-1].Value += sample.Value + } + + return outputSamples } func (c *Chunks) Add(ts model.Time) { - t := truncateTimestamp(ts) + t := truncateTimestamp(ts, timeResolution) if len(*c) == 0 { *c = append(*c, newChunk(t)) @@ -91,10 +114,10 @@ func (c *Chunks) Add(ts model.Time) { }) } -func (c Chunks) Iterator(pattern string, from, through model.Time) iter.Iterator { +func (c Chunks) Iterator(pattern string, from, through, step model.Time) iter.Iterator { iters := make([]iter.Iterator, 0, len(c)) for _, chunk := range c { - samples := chunk.ForRange(from, through) + samples := chunk.ForRange(from, through, step) if len(samples) == 0 { continue } @@ -173,4 +196,4 @@ func (c *Chunks) size() int { return size } -func truncateTimestamp(ts model.Time) model.Time { return ts - ts%timeResolution } +func truncateTimestamp(ts, step model.Time) model.Time { return ts - ts%step } diff --git a/pkg/pattern/drain/chunk_test.go b/pkg/pattern/drain/chunk_test.go index e75c70411a7be..6a6b999571aa3 100644 --- a/pkg/pattern/drain/chunk_test.go +++ b/pkg/pattern/drain/chunk_test.go @@ -30,7 +30,7 @@ func TestIterator(t *testing.T) { cks.Add(2*timeResolution + 1) cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + timeResolution + 1) - it := cks.Iterator("test", model.Time(0), model.Time(time.Hour.Nanoseconds())) + it := cks.Iterator("test", model.Time(0), model.Time(time.Hour.Nanoseconds()), timeResolution) require.NotNil(t, it) var samples []logproto.PatternSample @@ -64,9 +64,9 @@ func TestForRange(t *testing.T) { { name: "No Overlap", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, start: 10, end: 20, @@ -75,83 +75,116 @@ func TestForRange(t *testing.T) { { name: "Complete Overlap", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, start: 0, end: 10, expected: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }, }, { name: "Partial Overlap", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, - start: 2, - end: 4, - expected: []logproto.PatternSample{{Timestamp: 3, Value: 4}}, + start: 3, + end: 5, + expected: []logproto.PatternSample{{Timestamp: 4, Value: 4}}, }, { name: "Single Element in Range", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, - start: 3, - end: 4, - expected: []logproto.PatternSample{{Timestamp: 3, Value: 4}}, + start: 4, + end: 5, + expected: []logproto.PatternSample{{Timestamp: 4, Value: 4}}, }, { name: "Start Before First Element", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, start: 0, - end: 4, + end: 5, expected: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, }, }, { name: "End After Last Element", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, - start: 4, + start: 5, end: 10, expected: []logproto.PatternSample{ - {Timestamp: 5, Value: 6}, + {Timestamp: 6, Value: 6}, }, }, { name: "Start and End Before First Element", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, start: 0, - end: 1, + end: 2, expected: nil, }, + { + name: "Higher resolution samples down-sampled to preceding step bucket", + c: &Chunk{Samples: []logproto.PatternSample{ + {Timestamp: 1, Value: 2}, + {Timestamp: 2, Value: 4}, + {Timestamp: 3, Value: 6}, + {Timestamp: 4, Value: 8}, + {Timestamp: 5, Value: 10}, + {Timestamp: 6, Value: 12}, + }}, + start: 1, + end: 6, + expected: []logproto.PatternSample{ + {Timestamp: 0, Value: 2}, + {Timestamp: 2, Value: 10}, + {Timestamp: 4, Value: 18}, + {Timestamp: 6, Value: 12}, + }, + }, + { + name: "Low resolution samples insert 0 values for empty steps", + c: &Chunk{Samples: []logproto.PatternSample{ + {Timestamp: 1, Value: 2}, + {Timestamp: 5, Value: 10}, + }}, + start: 1, + end: 6, + expected: []logproto.PatternSample{ + {Timestamp: 0, Value: 2}, + {Timestamp: 2, Value: 0}, + {Timestamp: 4, Value: 10}, + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - result := tc.c.ForRange(tc.start, tc.end) + result := tc.c.ForRange(tc.start, tc.end, model.Time(2)) if !reflect.DeepEqual(result, tc.expected) { t.Errorf("Expected %v, got %v", tc.expected, result) } diff --git a/pkg/pattern/drain/log_cluster.go b/pkg/pattern/drain/log_cluster.go index 26dda97a7a16d..af5932d16f706 100644 --- a/pkg/pattern/drain/log_cluster.go +++ b/pkg/pattern/drain/log_cluster.go @@ -35,8 +35,8 @@ func (c *LogCluster) merge(samples []*logproto.PatternSample) { c.Chunks.merge(samples) } -func (c *LogCluster) Iterator(from, through model.Time) iter.Iterator { - return c.Chunks.Iterator(c.String(), from, through) +func (c *LogCluster) Iterator(from, through, step model.Time) iter.Iterator { + return c.Chunks.Iterator(c.String(), from, through, step) } func (c *LogCluster) Samples() []*logproto.PatternSample { diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 4b270a04ca391..ed60c3d1b4f8a 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "time" "github.com/go-kit/log" "github.com/grafana/dskit/httpgrpc" @@ -78,10 +79,14 @@ func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequ return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } from, through := util.RoundToMilliseconds(req.Start, req.End) + step := req.Step + if step < 10*time.Second.Milliseconds() { + step = 10 * time.Second.Milliseconds() + } var iters []iter.Iterator err = i.forMatchingStreams(matchers, func(s *stream) error { - iter, err := s.Iterator(ctx, from, through) + iter, err := s.Iterator(ctx, from, through, model.Time(step)) if err != nil { return err } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 036ad2d65dc03..8321fce9f647c 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -54,7 +54,7 @@ func (s *stream) Push( return nil } -func (s *stream) Iterator(_ context.Context, from, through model.Time) (iter.Iterator, error) { +func (s *stream) Iterator(_ context.Context, from, through, step model.Time) (iter.Iterator, error) { // todo we should improve locking. s.mtx.Lock() defer s.mtx.Unlock() @@ -66,7 +66,7 @@ func (s *stream) Iterator(_ context.Context, from, through model.Time) (iter.Ite if cluster.String() == "" { continue } - iters = append(iters, cluster.Iterator(from, through)) + iters = append(iters, cluster.Iterator(from, through, step)) } return iter.NewMerge(iters...), nil }