diff --git a/pkg/loghttp/patterns.go b/pkg/loghttp/patterns.go index 0517f9c059963..8b855b2c951a7 100644 --- a/pkg/loghttp/patterns.go +++ b/pkg/loghttp/patterns.go @@ -9,6 +9,7 @@ import ( func ParsePatternsQuery(r *http.Request) (*logproto.QueryPatternsRequest, error) { req := &logproto.QueryPatternsRequest{} + req.Query = query(r) start, end, err := bounds(r) if err != nil { return nil, err @@ -16,6 +17,19 @@ func ParsePatternsQuery(r *http.Request) (*logproto.QueryPatternsRequest, error) req.Start = start req.End = end - req.Query = query(r) + calculatedStep, err := step(r, start, end) + if err != nil { + return nil, err + } + if calculatedStep <= 0 { + return nil, errZeroOrNegativeStep + } + // For safety, limit the number of returned points per timeseries. + // This is sufficient for 60s resolution for a week or 1h resolution for a year. + if (req.End.Sub(req.Start) / calculatedStep) > 11000 { + return nil, errStepTooSmall + } + req.Step = calculatedStep.Milliseconds() + return req, nil } diff --git a/pkg/loghttp/patterns_test.go b/pkg/loghttp/patterns_test.go new file mode 100644 index 0000000000000..8847153655da1 --- /dev/null +++ b/pkg/loghttp/patterns_test.go @@ -0,0 +1,122 @@ +package loghttp + +import ( + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/logproto" +) + +func TestParsePatternsQuery(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + path string + want *logproto.QueryPatternsRequest + wantErr bool + }{ + { + name: "should correctly parse valid params", + path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=5s", + want: &logproto.QueryPatternsRequest{ + Query: "{}", + Start: time.Unix(100, 0), + End: time.Unix(3600, 0), + Step: (5 * time.Second).Milliseconds(), + }, + }, + { + name: "should default empty step param to sensible step for the range", + path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000", + want: &logproto.QueryPatternsRequest{ + Query: "{}", + Start: time.Unix(100, 0), + End: time.Unix(3600, 0), + Step: (14 * time.Second).Milliseconds(), + }, + }, + { + name: "should default start to zero for empty start param", + path: "/loki/api/v1/patterns?query={}&end=3600000000000", + want: &logproto.QueryPatternsRequest{ + Query: "{}", + Start: time.Unix(0, 0), + End: time.Unix(3600, 0), + Step: (14 * time.Second).Milliseconds(), + }, + }, + { + name: "should accept step with no units as seconds", + path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10", + want: &logproto.QueryPatternsRequest{ + Query: "{}", + Start: time.Unix(100, 0), + End: time.Unix(3600, 0), + Step: (10 * time.Second).Milliseconds(), + }, + }, + { + name: "should accept step as string duration in seconds", + path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=15s", + want: &logproto.QueryPatternsRequest{ + Query: "{}", + Start: time.Unix(100, 0), + End: time.Unix(3600, 0), + Step: (15 * time.Second).Milliseconds(), + }, + }, + { + name: "should correctly parse long duration for step", + path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10h", + want: &logproto.QueryPatternsRequest{ + Query: "{}", + Start: time.Unix(100, 0), + End: time.Unix(3600, 0), + Step: (10 * time.Hour).Milliseconds(), + }, + }, + { + name: "should reject negative step value", + path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=-5s", + want: nil, + wantErr: true, + }, + { + name: "should reject very small step for big range", + path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=50ms", + want: nil, + wantErr: true, + }, + { + name: "should accept very small step for small range", + path: "/loki/api/v1/patterns?query={}&start=100000000000&end=110000000000&step=50ms", + want: &logproto.QueryPatternsRequest{ + Query: "{}", + Start: time.Unix(100, 0), + End: time.Unix(110, 0), + Step: (50 * time.Millisecond).Milliseconds(), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, tt.path, nil) + require.NoError(t, err) + err = req.ParseForm() + require.NoError(t, err) + + got, err := ParsePatternsQuery(req) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + assert.Equalf(t, tt.want, got, "Incorrect response from input path: %s", tt.path) + }) + } +} diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index 212abc38633ec..cfb09e285836d 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -335,6 +335,7 @@ func (m *VolumeRequest) LogToSpan(sp opentracing.Span) { otlog.String("query", m.GetQuery()), otlog.String("start", timestamp.Time(int64(m.From)).String()), otlog.String("end", timestamp.Time(int64(m.Through)).String()), + otlog.String("step", time.Duration(m.Step).String()), ) } @@ -448,8 +449,6 @@ func (m *ShardsRequest) LogToSpan(sp opentracing.Span) { func (m *QueryPatternsRequest) GetCachingOptions() (res definitions.CachingOptions) { return } -func (m *QueryPatternsRequest) GetStep() int64 { return 0 } - func (m *QueryPatternsRequest) WithStartEnd(start, end time.Time) definitions.Request { clone := *m clone.Start = start @@ -469,9 +468,10 @@ func (m *QueryPatternsRequest) WithStartEndForCache(start, end time.Time) result func (m *QueryPatternsRequest) LogToSpan(sp opentracing.Span) { fields := []otlog.Field{ + otlog.String("query", m.GetQuery()), otlog.String("start", m.Start.String()), otlog.String("end", m.End.String()), - otlog.String("query", m.GetQuery()), + otlog.String("step", time.Duration(m.Step).String()), } sp.LogFields(fields...) } diff --git a/pkg/logproto/pattern.pb.go b/pkg/logproto/pattern.pb.go index b1b5755e9dfa0..a666a32850127 100644 --- a/pkg/logproto/pattern.pb.go +++ b/pkg/logproto/pattern.pb.go @@ -39,6 +39,7 @@ type QueryPatternsRequest struct { Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` Start time.Time `protobuf:"bytes,2,opt,name=start,proto3,stdtime" json:"start"` End time.Time `protobuf:"bytes,3,opt,name=end,proto3,stdtime" json:"end"` + Step int64 `protobuf:"varint,4,opt,name=step,proto3" json:"step,omitempty"` } func (m *QueryPatternsRequest) Reset() { *m = QueryPatternsRequest{} } @@ -94,6 +95,13 @@ func (m *QueryPatternsRequest) GetEnd() time.Time { return time.Time{} } +func (m *QueryPatternsRequest) GetStep() int64 { + if m != nil { + return m.Step + } + return 0 +} + type QueryPatternsResponse struct { Series []*PatternSeries `protobuf:"bytes,1,rep,name=series,proto3" json:"series,omitempty"` } @@ -242,37 +250,38 @@ func init() { func init() { proto.RegisterFile("pkg/logproto/pattern.proto", fileDescriptor_aaf4192acc66a4ea) } var fileDescriptor_aaf4192acc66a4ea = []byte{ - // 470 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x31, 0x6f, 0xd4, 0x30, - 0x14, 0x8e, 0x1b, 0xae, 0xd7, 0xba, 0x62, 0x31, 0x57, 0x88, 0x82, 0xe4, 0x9c, 0xb2, 0x70, 0x53, - 0x0c, 0x57, 0x09, 0x24, 0xc6, 0x9b, 0x18, 0x40, 0x2a, 0x81, 0x09, 0xc1, 0x90, 0x6b, 0x5d, 0xe7, - 0xd4, 0x38, 0x4e, 0x63, 0xbb, 0x12, 0x1b, 0x3f, 0xe1, 0x7e, 0x02, 0x23, 0x3f, 0xa5, 0xe3, 0x8d, - 0x15, 0x43, 0xe1, 0x72, 0x0b, 0x63, 0x7f, 0x02, 0x8a, 0xed, 0xf4, 0xae, 0x15, 0x1d, 0x58, 0x12, - 0xbf, 0xf7, 0x7d, 0xef, 0xf3, 0xf7, 0xde, 0x33, 0x0c, 0xab, 0x53, 0x46, 0x0a, 0xc1, 0xaa, 0x5a, - 0x28, 0x41, 0xaa, 0x4c, 0x29, 0x5a, 0x97, 0x89, 0x89, 0xd0, 0x4e, 0x97, 0x0f, 0x07, 0x4c, 0x30, - 0x61, 0x29, 0xed, 0xc9, 0xe2, 0x61, 0xc4, 0x84, 0x60, 0x05, 0x25, 0x26, 0x9a, 0xea, 0x13, 0xa2, - 0x66, 0x9c, 0x4a, 0x95, 0xf1, 0xca, 0x11, 0x9e, 0xde, 0x12, 0xef, 0x0e, 0x0e, 0x7c, 0xd4, 0x82, - 0x95, 0x96, 0xb9, 0xf9, 0xd8, 0x64, 0xfc, 0x1d, 0xc0, 0xc1, 0x7b, 0x4d, 0xeb, 0xaf, 0x87, 0xd6, - 0x89, 0x4c, 0xe9, 0x99, 0xa6, 0x52, 0xa1, 0x01, 0xec, 0x9d, 0xb5, 0xf9, 0x00, 0x0c, 0xc1, 0x68, - 0x37, 0xb5, 0x01, 0x7a, 0x0d, 0x7b, 0x52, 0x65, 0xb5, 0x0a, 0xb6, 0x86, 0x60, 0xb4, 0x37, 0x0e, - 0x13, 0xeb, 0x28, 0xe9, 0x1c, 0x25, 0x1f, 0x3b, 0x47, 0x93, 0x9d, 0x8b, 0xab, 0xc8, 0x9b, 0xff, - 0x8a, 0x40, 0x6a, 0x4b, 0xd0, 0x4b, 0xe8, 0xd3, 0xf2, 0x38, 0xf0, 0xff, 0xa3, 0xb2, 0x2d, 0x88, - 0xdf, 0xc0, 0xfd, 0x3b, 0x0e, 0x65, 0x25, 0x4a, 0x49, 0x11, 0x81, 0xdb, 0x92, 0xd6, 0x33, 0x2a, - 0x03, 0x30, 0xf4, 0x47, 0x7b, 0xe3, 0x27, 0xc9, 0x4d, 0xc7, 0x8e, 0xfb, 0xc1, 0xc0, 0xa9, 0xa3, - 0xc5, 0x9f, 0xe1, 0xc3, 0x5b, 0x00, 0x0a, 0x60, 0xdf, 0x6d, 0xc0, 0xb5, 0xd9, 0x85, 0xe8, 0x05, - 0xec, 0xcb, 0x8c, 0x57, 0x05, 0x95, 0xc1, 0xd6, 0x7d, 0xe2, 0x06, 0x4f, 0x3b, 0x5e, 0xac, 0xd6, - 0xea, 0x26, 0x83, 0xde, 0xc1, 0xdd, 0x9b, 0x05, 0x19, 0x7d, 0x7f, 0x42, 0xda, 0xd6, 0x7e, 0x5e, - 0x45, 0xcf, 0xd8, 0x4c, 0xe5, 0x7a, 0x9a, 0x1c, 0x09, 0xde, 0x6e, 0x93, 0x53, 0x95, 0x53, 0x2d, - 0xc9, 0x91, 0xe0, 0x5c, 0x94, 0x84, 0x8b, 0x63, 0x5a, 0x98, 0x81, 0xa4, 0x6b, 0x85, 0x76, 0x23, - 0xe7, 0x59, 0xa1, 0xa9, 0x99, 0xbd, 0x9f, 0xda, 0x60, 0x3c, 0x07, 0xb0, 0xef, 0xae, 0x45, 0xaf, - 0xe0, 0x83, 0x43, 0x2d, 0x73, 0xb4, 0xbf, 0xe1, 0x55, 0xcb, 0xdc, 0xad, 0x34, 0x7c, 0x7c, 0x37, - 0x6d, 0xe7, 0x18, 0x7b, 0xe8, 0x2d, 0xec, 0x99, 0x11, 0x23, 0xbc, 0xa6, 0xfc, 0xeb, 0x55, 0x84, - 0xd1, 0xbd, 0x78, 0xa7, 0xf5, 0x1c, 0x4c, 0xbe, 0x2c, 0x96, 0xd8, 0xbb, 0x5c, 0x62, 0xef, 0x7a, - 0x89, 0xc1, 0xb7, 0x06, 0x83, 0x1f, 0x0d, 0x06, 0x17, 0x0d, 0x06, 0x8b, 0x06, 0x83, 0xdf, 0x0d, - 0x06, 0x7f, 0x1a, 0xec, 0x5d, 0x37, 0x18, 0xcc, 0x57, 0xd8, 0x5b, 0xac, 0xb0, 0x77, 0xb9, 0xc2, - 0xde, 0xa7, 0xcd, 0x91, 0xb0, 0x3a, 0x3b, 0xc9, 0xca, 0x8c, 0x14, 0xe2, 0x74, 0x46, 0xce, 0x0f, - 0xc8, 0xe6, 0xb3, 0x9e, 0x6e, 0x9b, 0xdf, 0xc1, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xec, 0xd6, - 0xbc, 0xfc, 0x4a, 0x03, 0x00, 0x00, + // 483 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xb1, 0x6e, 0xd3, 0x40, + 0x18, 0xf6, 0xd5, 0x49, 0xd3, 0x5e, 0xc5, 0x72, 0xa4, 0x60, 0x19, 0xe9, 0x1c, 0x79, 0x21, 0x93, + 0x0f, 0x52, 0x09, 0x24, 0xc6, 0x4c, 0x0c, 0x20, 0x15, 0xc3, 0x84, 0x60, 0x70, 0xda, 0xbf, 0xb6, + 0x55, 0xdb, 0xe7, 0xfa, 0xee, 0x2a, 0xb1, 0xf1, 0x08, 0x79, 0x0c, 0x1e, 0x80, 0x87, 0xe8, 0x98, + 0xb1, 0x62, 0x28, 0xc4, 0x59, 0x18, 0xfb, 0x08, 0xc8, 0x77, 0x76, 0x93, 0x56, 0x74, 0xe8, 0x92, + 0xdc, 0xff, 0x7f, 0xdf, 0xff, 0xf9, 0xbb, 0xff, 0x3b, 0xec, 0x96, 0xa7, 0x31, 0xcb, 0x78, 0x5c, + 0x56, 0x5c, 0x72, 0x56, 0x46, 0x52, 0x42, 0x55, 0x04, 0xba, 0x22, 0x3b, 0x5d, 0xdf, 0x1d, 0xc6, + 0x3c, 0xe6, 0x86, 0xd2, 0x9c, 0x0c, 0xee, 0x7a, 0x31, 0xe7, 0x71, 0x06, 0x4c, 0x57, 0x33, 0x75, + 0xc2, 0x64, 0x9a, 0x83, 0x90, 0x51, 0x5e, 0xb6, 0x84, 0x67, 0xb7, 0xc4, 0xbb, 0x43, 0x0b, 0x3e, + 0x6e, 0xc0, 0x52, 0x89, 0x44, 0xff, 0x98, 0xa6, 0xff, 0x13, 0xe1, 0xe1, 0x07, 0x05, 0xd5, 0xb7, + 0x43, 0xe3, 0x44, 0x84, 0x70, 0xa6, 0x40, 0x48, 0x32, 0xc4, 0xfd, 0xb3, 0xa6, 0xef, 0xa0, 0x11, + 0x1a, 0xef, 0x86, 0xa6, 0x20, 0x6f, 0x70, 0x5f, 0xc8, 0xa8, 0x92, 0xce, 0xd6, 0x08, 0x8d, 0xf7, + 0x26, 0x6e, 0x60, 0x1c, 0x05, 0x9d, 0xa3, 0xe0, 0x53, 0xe7, 0x68, 0xba, 0x73, 0x71, 0xe5, 0x59, + 0xf3, 0xdf, 0x1e, 0x0a, 0xcd, 0x08, 0x79, 0x85, 0x6d, 0x28, 0x8e, 0x1d, 0xfb, 0x01, 0x93, 0xcd, + 0x00, 0x21, 0xb8, 0x27, 0x24, 0x94, 0x4e, 0x6f, 0x84, 0xc6, 0x76, 0xa8, 0xcf, 0xfe, 0x5b, 0xbc, + 0x7f, 0xc7, 0xb5, 0x28, 0x79, 0x21, 0x80, 0x30, 0xbc, 0x2d, 0xa0, 0x4a, 0x41, 0x38, 0x68, 0x64, + 0x8f, 0xf7, 0x26, 0x4f, 0x83, 0x9b, 0x2d, 0xb4, 0xdc, 0x8f, 0x1a, 0x0e, 0x5b, 0x9a, 0xff, 0x05, + 0x3f, 0xba, 0x05, 0x10, 0x07, 0x0f, 0xda, 0x54, 0xda, 0xab, 0x77, 0x25, 0x79, 0x89, 0x07, 0x22, + 0xca, 0xcb, 0x0c, 0x84, 0xb3, 0x75, 0x9f, 0xb8, 0xc6, 0xc3, 0x8e, 0xe7, 0xcb, 0xb5, 0xba, 0xee, + 0x90, 0xf7, 0x78, 0xf7, 0x26, 0x34, 0xad, 0x6f, 0x4f, 0x59, 0x73, 0xdd, 0x5f, 0x57, 0xde, 0xf3, + 0x38, 0x95, 0x89, 0x9a, 0x05, 0x47, 0x3c, 0x6f, 0x12, 0xce, 0x41, 0x26, 0xa0, 0x04, 0x3b, 0xe2, + 0x79, 0xce, 0x0b, 0x96, 0xf3, 0x63, 0xc8, 0xf4, 0x92, 0xc2, 0xb5, 0x42, 0x93, 0xd2, 0x79, 0x94, + 0x29, 0xd0, 0x79, 0xd8, 0xa1, 0x29, 0x26, 0x73, 0x84, 0x07, 0xed, 0x67, 0xc9, 0x6b, 0xdc, 0x3b, + 0x54, 0x22, 0x21, 0xfb, 0x1b, 0x5e, 0x95, 0x48, 0xda, 0x98, 0xdd, 0x27, 0x77, 0xdb, 0x66, 0x8f, + 0xbe, 0x45, 0xde, 0xe1, 0xbe, 0x5e, 0x31, 0xa1, 0x6b, 0xca, 0xff, 0x5e, 0x8a, 0xeb, 0xdd, 0x8b, + 0x77, 0x5a, 0x2f, 0xd0, 0xf4, 0xeb, 0x62, 0x49, 0xad, 0xcb, 0x25, 0xb5, 0xae, 0x97, 0x14, 0x7d, + 0xaf, 0x29, 0xfa, 0x51, 0x53, 0x74, 0x51, 0x53, 0xb4, 0xa8, 0x29, 0xfa, 0x53, 0x53, 0xf4, 0xb7, + 0xa6, 0xd6, 0x75, 0x4d, 0xd1, 0x7c, 0x45, 0xad, 0xc5, 0x8a, 0x5a, 0x97, 0x2b, 0x6a, 0x7d, 0xde, + 0x5c, 0x49, 0x5c, 0x45, 0x27, 0x51, 0x11, 0xb1, 0x8c, 0x9f, 0xa6, 0xec, 0xfc, 0x80, 0x6d, 0x3e, + 0xf5, 0xd9, 0xb6, 0xfe, 0x3b, 0xf8, 0x17, 0x00, 0x00, 0xff, 0xff, 0x3b, 0x4f, 0x5c, 0x50, 0x5e, + 0x03, 0x00, 0x00, } func (this *QueryPatternsRequest) Equal(that interface{}) bool { @@ -303,6 +312,9 @@ func (this *QueryPatternsRequest) Equal(that interface{}) bool { if !this.End.Equal(that1.End) { return false } + if this.Step != that1.Step { + return false + } return true } func (this *QueryPatternsResponse) Equal(that interface{}) bool { @@ -397,11 +409,12 @@ func (this *QueryPatternsRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&logproto.QueryPatternsRequest{") s = append(s, "Query: "+fmt.Sprintf("%#v", this.Query)+",\n") s = append(s, "Start: "+fmt.Sprintf("%#v", this.Start)+",\n") s = append(s, "End: "+fmt.Sprintf("%#v", this.End)+",\n") + s = append(s, "Step: "+fmt.Sprintf("%#v", this.Step)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -614,6 +627,11 @@ func (m *QueryPatternsRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Step != 0 { + i = encodeVarintPattern(dAtA, i, uint64(m.Step)) + i-- + dAtA[i] = 0x20 + } n1, err1 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.End, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.End):]) if err1 != nil { return 0, err1 @@ -779,6 +797,9 @@ func (m *QueryPatternsRequest) Size() (n int) { n += 1 + l + sovPattern(uint64(l)) l = github_com_gogo_protobuf_types.SizeOfStdTime(m.End) n += 1 + l + sovPattern(uint64(l)) + if m.Step != 0 { + n += 1 + sovPattern(uint64(m.Step)) + } return n } @@ -845,6 +866,7 @@ func (this *QueryPatternsRequest) String() string { `Query:` + fmt.Sprintf("%v", this.Query) + `,`, `Start:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Start), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `End:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.End), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `Step:` + fmt.Sprintf("%v", this.Step) + `,`, `}`, }, "") return s @@ -1026,6 +1048,25 @@ func (m *QueryPatternsRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Step", wireType) + } + m.Step = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPattern + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Step |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipPattern(dAtA[iNdEx:]) diff --git a/pkg/logproto/pattern.proto b/pkg/logproto/pattern.proto index 66a6c017b1926..e92a201b3a8b1 100644 --- a/pkg/logproto/pattern.proto +++ b/pkg/logproto/pattern.proto @@ -24,6 +24,7 @@ message QueryPatternsRequest { (gogoproto.stdtime) = true, (gogoproto.nullable) = false ]; + int64 step = 4; } message QueryPatternsResponse { diff --git a/pkg/pattern/drain/chunk.go b/pkg/pattern/drain/chunk.go index e438bb6b7c561..9b1e34e2e3a19 100644 --- a/pkg/pattern/drain/chunk.go +++ b/pkg/pattern/drain/chunk.go @@ -11,7 +11,7 @@ import ( ) const ( - timeResolution = model.Time(int64(time.Second*10) / 1e6) + TimeResolution = model.Time(int64(time.Second*10) / 1e6) defaultVolumeSize = 500 @@ -25,7 +25,7 @@ type Chunk struct { } func newChunk(ts model.Time) Chunk { - maxSize := int(maxChunkTime.Nanoseconds()/timeResolution.UnixNano()) + 1 + maxSize := int(maxChunkTime.Nanoseconds()/TimeResolution.UnixNano()) + 1 v := Chunk{Samples: make([]logproto.PatternSample, 1, maxSize)} v.Samples[0] = logproto.PatternSample{ Timestamp: ts, @@ -43,9 +43,9 @@ func (c Chunk) spaceFor(ts model.Time) bool { } // ForRange returns samples with only the values -// in the given range [start:end). -// start and end are in milliseconds since epoch. -func (c Chunk) ForRange(start, end model.Time) []logproto.PatternSample { +// in the given range [start:end) and aggregates them by step duration. +// start and end are in milliseconds since epoch. step is a duration in milliseconds. +func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample { if len(c.Samples) == 0 { return nil } @@ -66,11 +66,36 @@ func (c Chunk) ForRange(start, end model.Time) []logproto.PatternSample { return c.Samples[i].Timestamp >= end }) } - return c.Samples[lo:hi] + if step == TimeResolution { + return c.Samples[lo:hi] + } + + // Re-scale samples into step-sized buckets + currentStep := truncateTimestamp(c.Samples[lo].Timestamp, step) + aggregatedSamples := make([]logproto.PatternSample, 0, ((c.Samples[hi-1].Timestamp-currentStep)/step)+1) + aggregatedSamples = append(aggregatedSamples, logproto.PatternSample{ + Timestamp: currentStep, + Value: 0, + }) + for _, sample := range c.Samples[lo:hi] { + if sample.Timestamp >= currentStep+step { + stepForSample := truncateTimestamp(sample.Timestamp, step) + for i := currentStep + step; i <= stepForSample; i += step { + aggregatedSamples = append(aggregatedSamples, logproto.PatternSample{ + Timestamp: i, + Value: 0, + }) + } + currentStep = stepForSample + } + aggregatedSamples[len(aggregatedSamples)-1].Value += sample.Value + } + + return aggregatedSamples } func (c *Chunks) Add(ts model.Time) { - t := truncateTimestamp(ts) + t := truncateTimestamp(ts, TimeResolution) if len(*c) == 0 { *c = append(*c, newChunk(t)) @@ -91,10 +116,10 @@ func (c *Chunks) Add(ts model.Time) { }) } -func (c Chunks) Iterator(pattern string, from, through model.Time) iter.Iterator { +func (c Chunks) Iterator(pattern string, from, through, step model.Time) iter.Iterator { iters := make([]iter.Iterator, 0, len(c)) for _, chunk := range c { - samples := chunk.ForRange(from, through) + samples := chunk.ForRange(from, through, step) if len(samples) == 0 { continue } @@ -173,4 +198,4 @@ func (c *Chunks) size() int { return size } -func truncateTimestamp(ts model.Time) model.Time { return ts - ts%timeResolution } +func truncateTimestamp(ts, step model.Time) model.Time { return ts - ts%step } diff --git a/pkg/pattern/drain/chunk_test.go b/pkg/pattern/drain/chunk_test.go index e75c70411a7be..4863a6629729a 100644 --- a/pkg/pattern/drain/chunk_test.go +++ b/pkg/pattern/drain/chunk_test.go @@ -13,24 +13,24 @@ import ( func TestAdd(t *testing.T) { cks := Chunks{} - cks.Add(timeResolution + 1) - cks.Add(timeResolution + 2) - cks.Add(2*timeResolution + 1) + cks.Add(TimeResolution + 1) + cks.Add(TimeResolution + 2) + cks.Add(2*TimeResolution + 1) require.Equal(t, 1, len(cks)) require.Equal(t, 2, len(cks[0].Samples)) - cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + timeResolution + 1) + cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + TimeResolution + 1) require.Equal(t, 2, len(cks)) require.Equal(t, 1, len(cks[1].Samples)) } func TestIterator(t *testing.T) { cks := Chunks{} - cks.Add(timeResolution + 1) - cks.Add(timeResolution + 2) - cks.Add(2*timeResolution + 1) - cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + timeResolution + 1) + cks.Add(TimeResolution + 1) + cks.Add(TimeResolution + 2) + cks.Add(2*TimeResolution + 1) + cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + TimeResolution + 1) - it := cks.Iterator("test", model.Time(0), model.Time(time.Hour.Nanoseconds())) + it := cks.Iterator("test", model.Time(0), model.Time(time.Hour.Nanoseconds()), TimeResolution) require.NotNil(t, it) var samples []logproto.PatternSample @@ -64,9 +64,9 @@ func TestForRange(t *testing.T) { { name: "No Overlap", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, start: 10, end: 20, @@ -75,86 +75,120 @@ func TestForRange(t *testing.T) { { name: "Complete Overlap", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, start: 0, end: 10, expected: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }, }, { name: "Partial Overlap", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, - start: 2, - end: 4, - expected: []logproto.PatternSample{{Timestamp: 3, Value: 4}}, + start: 3, + end: 5, + expected: []logproto.PatternSample{{Timestamp: 4, Value: 4}}, }, { name: "Single Element in Range", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, - start: 3, - end: 4, - expected: []logproto.PatternSample{{Timestamp: 3, Value: 4}}, + start: 4, + end: 5, + expected: []logproto.PatternSample{{Timestamp: 4, Value: 4}}, }, { name: "Start Before First Element", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, start: 0, - end: 4, + end: 5, expected: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, }, }, { name: "End After Last Element", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, - start: 4, + start: 5, end: 10, expected: []logproto.PatternSample{ - {Timestamp: 5, Value: 6}, + {Timestamp: 6, Value: 6}, }, }, { name: "Start and End Before First Element", c: &Chunk{Samples: []logproto.PatternSample{ - {Timestamp: 1, Value: 2}, - {Timestamp: 3, Value: 4}, - {Timestamp: 5, Value: 6}, + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, }}, start: 0, - end: 1, + end: 2, expected: nil, }, + { + name: "Higher resolution samples down-sampled to preceding step bucket", + c: &Chunk{Samples: []logproto.PatternSample{ + {Timestamp: 1, Value: 2}, + {Timestamp: 2, Value: 4}, + {Timestamp: 3, Value: 6}, + {Timestamp: 4, Value: 8}, + {Timestamp: 5, Value: 10}, + {Timestamp: 6, Value: 12}, + }}, + start: 1, + end: 6, + expected: []logproto.PatternSample{ + {Timestamp: 0, Value: 2}, + {Timestamp: 2, Value: 10}, + {Timestamp: 4, Value: 18}, + {Timestamp: 6, Value: 12}, + }, + }, + { + name: "Low resolution samples insert 0 values for empty steps", + c: &Chunk{Samples: []logproto.PatternSample{ + {Timestamp: 1, Value: 2}, + {Timestamp: 5, Value: 10}, + }}, + start: 1, + end: 6, + expected: []logproto.PatternSample{ + {Timestamp: 0, Value: 2}, + {Timestamp: 2, Value: 0}, + {Timestamp: 4, Value: 10}, + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - result := tc.c.ForRange(tc.start, tc.end) + result := tc.c.ForRange(tc.start, tc.end, model.Time(2)) if !reflect.DeepEqual(result, tc.expected) { t.Errorf("Expected %v, got %v", tc.expected, result) } + require.Equal(t, len(result), cap(result), "Returned slice wasn't created at the correct capacity") }) } } diff --git a/pkg/pattern/drain/log_cluster.go b/pkg/pattern/drain/log_cluster.go index 26dda97a7a16d..af5932d16f706 100644 --- a/pkg/pattern/drain/log_cluster.go +++ b/pkg/pattern/drain/log_cluster.go @@ -35,8 +35,8 @@ func (c *LogCluster) merge(samples []*logproto.PatternSample) { c.Chunks.merge(samples) } -func (c *LogCluster) Iterator(from, through model.Time) iter.Iterator { - return c.Chunks.Iterator(c.String(), from, through) +func (c *LogCluster) Iterator(from, through, step model.Time) iter.Iterator { + return c.Chunks.Iterator(c.String(), from, through, step) } func (c *LogCluster) Samples() []*logproto.PatternSample { diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 4b270a04ca391..7ac0099edec36 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester/index" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/v3/pkg/util" ) @@ -78,10 +79,14 @@ func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequ return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } from, through := util.RoundToMilliseconds(req.Start, req.End) + step := model.Time(req.Step) + if step < drain.TimeResolution { + step = drain.TimeResolution + } var iters []iter.Iterator err = i.forMatchingStreams(matchers, func(s *stream) error { - iter, err := s.Iterator(ctx, from, through) + iter, err := s.Iterator(ctx, from, through, step) if err != nil { return err } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 036ad2d65dc03..8321fce9f647c 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -54,7 +54,7 @@ func (s *stream) Push( return nil } -func (s *stream) Iterator(_ context.Context, from, through model.Time) (iter.Iterator, error) { +func (s *stream) Iterator(_ context.Context, from, through, step model.Time) (iter.Iterator, error) { // todo we should improve locking. s.mtx.Lock() defer s.mtx.Unlock() @@ -66,7 +66,7 @@ func (s *stream) Iterator(_ context.Context, from, through model.Time) (iter.Ite if cluster.String() == "" { continue } - iters = append(iters, cluster.Iterator(from, through)) + iters = append(iters, cluster.Iterator(from, through, step)) } return iter.NewMerge(iters...), nil } diff --git a/pkg/pattern/stream_test.go b/pkg/pattern/stream_test.go index 6d3b0010d3254..cd76336b2e600 100644 --- a/pkg/pattern/stream_test.go +++ b/pkg/pattern/stream_test.go @@ -34,7 +34,7 @@ func TestAddStream(t *testing.T) { }, }) require.NoError(t, err) - it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest) + it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest, model.Time(time.Second)) require.NoError(t, err) res, err := iter.ReadAll(it) require.NoError(t, err) @@ -68,7 +68,7 @@ func TestPruneStream(t *testing.T) { }) require.NoError(t, err) require.Equal(t, false, stream.prune(time.Hour)) - it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest) + it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest, model.Time(time.Second)) require.NoError(t, err) res, err := iter.ReadAll(it) require.NoError(t, err) diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 09997156128ef..58b519289f62f 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -996,9 +996,10 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht return req.WithContext(ctx), nil case *logproto.QueryPatternsRequest: params := url.Values{ + "query": []string{request.GetQuery()}, "start": []string{fmt.Sprintf("%d", request.Start.UnixNano())}, "end": []string{fmt.Sprintf("%d", request.End.UnixNano())}, - "query": []string{request.GetQuery()}, + "step": []string{fmt.Sprintf("%d", request.GetStep())}, } u := &url.URL{