diff --git a/pkg/logql/range_vector.go b/pkg/logql/range_vector.go index 200f1480add7e..141d4865c46c7 100644 --- a/pkg/logql/range_vector.go +++ b/pkg/logql/range_vector.go @@ -108,20 +108,20 @@ func NewBatchRangeVectorIterator( end: end, current: start - step, // first loop iteration will set it to start offset: offset, - metrics: map[string]labels.Labels{}, - window: map[string]*promql.Series{}, + metrics: map[string]labels.Labels{}, + window: map[string]*promql.Series{}, agg: agg, } } func (r *batchRangeVectorIterator) Next() bool { // slides the range window to the next position - r.current = r.current + r.step // first current will be 5 min before start + r.current = r.current + r.step if r.current > r.end { return false } rangeEnd := r.current - rangeStart := rangeEnd - r.selRange // in nanoseconds + rangeStart := rangeEnd - r.selRange // load samples r.popBack(rangeStart) r.load(rangeStart, rangeEnd) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 9446b351aab82..ce826f0752d4b 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -331,7 +331,7 @@ type Loki struct { distributor *distributor.Distributor Ingester ingester.Interface PatternIngester *pattern.Ingester - PatternRingClient *pattern.RingClient + PatternRingClient pattern.RingClient Querier querier.Querier cacheGenerationLoader queryrangebase.CacheGenNumberLoader querierAPI *querier.QuerierAPI diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index e9b42010a56ea..41fce2d123983 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -4,10 +4,8 @@ import ( "context" "errors" "math" - "net/http" "github.com/go-kit/log" - "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/ring" "github.com/prometheus/client_golang/prometheus" @@ -26,14 +24,14 @@ type IngesterQuerier struct { cfg Config logger log.Logger - ringClient *RingClient + ringClient RingClient registerer prometheus.Registerer } func NewIngesterQuerier( cfg Config, - ringClient *RingClient, + ringClient RingClient, metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, @@ -48,21 +46,31 @@ func NewIngesterQuerier( func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { // validate that a supported query was provided - // TODO(twhitney): validate metric queries don't have filters var expr syntax.Expr _, err := syntax.ParseMatchers(req.Query, true) if err != nil { expr, err = syntax.ParseSampleExpr(req.Query) if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, ErrParseQuery.Error()) + return nil, ErrParseQuery } + var selector syntax.LogSelectorExpr switch expr.(type) { - case *syntax.VectorAggregationExpr, *syntax.RangeAggregationExpr: - break + case *syntax.VectorAggregationExpr: + selector, err = expr.(*syntax.VectorAggregationExpr).Selector() + case *syntax.RangeAggregationExpr: + selector, err = expr.(*syntax.RangeAggregationExpr).Selector() default: return nil, ErrParseQuery } + + if err != nil { + return nil, err + } + + if selector == nil || selector.HasFilter() { + return nil, ErrParseQuery + } } resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.PatternClient) (interface{}, error) { @@ -118,7 +126,7 @@ func prunePatterns( // ForAllIngesters runs f, in parallel, for all ingesters func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) { - replicationSet, err := q.ringClient.ring.GetReplicationSetForOperation(ring.Read) + replicationSet, err := q.ringClient.Ring().GetReplicationSetForOperation(ring.Read) if err != nil { return nil, err } @@ -137,7 +145,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet // Nothing here } results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (ResponseFromIngesters, error) { - client, err := q.ringClient.pool.GetClientFor(ingester.Addr) + client, err := q.ringClient.Pool().GetClientFor(ingester.Addr) if err != nil { return ResponseFromIngesters{addr: ingester.Addr}, err } diff --git a/pkg/pattern/ingester_querier_test.go b/pkg/pattern/ingester_querier_test.go index c83da105ec2c6..8090596384012 100644 --- a/pkg/pattern/ingester_querier_test.go +++ b/pkg/pattern/ingester_querier_test.go @@ -2,11 +2,17 @@ package pattern import ( "bufio" + "context" "os" "testing" + "time" + "github.com/go-kit/log" "github.com/stretchr/testify/require" + "github.com/grafana/dskit/ring" + ring_client "github.com/grafana/dskit/ring/client" + "github.com/grafana/dskit/services" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -39,3 +45,155 @@ func Test_prunePatterns(t *testing.T) { require.Equal(t, expectedPatterns, patterns) } + +func Test_Patterns(t *testing.T) { + t.Run("it rejects metric queries with filters", func(t *testing.T) { + q := &IngesterQuerier{ + cfg: Config{}, + logger: log.NewNopLogger(), + ringClient: &fakeRingClient{}, + registerer: nil, + } + for _, query := range []string{ + `count_over_time({foo="bar"} |= "baz" [5m])`, + `count_over_time({foo="bar"} != "baz" [5m])`, + `count_over_time({foo="bar"} =~ "baz" [5m])`, + `count_over_time({foo="bar"} !~ "baz" [5m])`, + `count_over_time({foo="bar"} | logfmt | color=blue [5m])`, + `sum(count_over_time({foo="bar"} |= "baz" [5m]))`, + `sum by label(count_over_time({foo="bar"} |= "baz" [5m]))`, + `bytes_over_time({foo="bar"} |= "baz" [5m])`, + } { + _, err := q.Patterns( + context.Background(), + &logproto.QueryPatternsRequest{ + Query: query, + }, + ) + require.Error(t, err, query) + require.ErrorIs(t, err, ErrParseQuery) + + } + }) + + t.Run("accepts log selector queries and count and bytes metric queries", func(t *testing.T) { + q := &IngesterQuerier{ + cfg: Config{}, + logger: log.NewNopLogger(), + ringClient: &fakeRingClient{}, + registerer: nil, + } + for _, query := range []string{ + `{foo="bar"}`, + `count_over_time({foo="bar"}[5m])`, + `bytes_over_time({foo="bar"}[5m])`, + `sum(count_over_time({foo="bar"}[5m]))`, + `sum(bytes_over_time({foo="bar"}[5m]))`, + `sum by (level)(count_over_time({foo="bar"}[5m]))`, + `sum by (level)(bytes_over_time({foo="bar"}[5m]))`, + } { + _, err := q.Patterns( + context.Background(), + &logproto.QueryPatternsRequest{ + Query: query, + }, + ) + require.NoError(t, err, query) + } + }) +} + +type fakeRingClient struct{} + +func (f *fakeRingClient) Pool() *ring_client.Pool { + panic("not implemented") +} + +func (f *fakeRingClient) StartAsync(ctx context.Context) error { + panic("not implemented") +} + +func (f *fakeRingClient) AwaitRunning(ctx context.Context) error { + panic("not implemented") +} + +func (f *fakeRingClient) StopAsync() { + panic("not implemented") +} + +func (f *fakeRingClient) AwaitTerminated(ctx context.Context) error { + panic("not implemented") +} + +func (f *fakeRingClient) FailureCase() error { + panic("not implemented") +} + +func (f *fakeRingClient) State() services.State { + panic("not implemented") +} + +func (f *fakeRingClient) AddListener(listener services.Listener) { + panic("not implemented") +} + +func (f *fakeRingClient) Ring() ring.ReadRing { + return &fakeRing{} +} + +type fakeRing struct{} + +func (f *fakeRing) Get( + key uint32, + op ring.Operation, + bufDescs []ring.InstanceDesc, + bufHosts []string, + bufZones []string, +) (ring.ReplicationSet, error) { + panic("not implemented") +} + +func (f *fakeRing) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) { + panic("not implemented") +} + +func (f *fakeRing) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) { + return ring.ReplicationSet{}, nil +} + +func (f *fakeRing) ReplicationFactor() int { + panic("not implemented") +} + +func (f *fakeRing) InstancesCount() int { + panic("not implemented") +} + +func (f *fakeRing) ShuffleShard(identifier string, size int) ring.ReadRing { + panic("not implemented") +} + +func (f *fakeRing) GetInstanceState(instanceID string) (ring.InstanceState, error) { + panic("not implemented") +} + +func (f *fakeRing) ShuffleShardWithLookback( + identifier string, + size int, + lookbackPeriod time.Duration, + now time.Time, +) ring.ReadRing { + panic("not implemented") +} + +func (f *fakeRing) HasInstance(instanceID string) bool { + panic("not implemented") +} + +func (f *fakeRing) CleanupShuffleShardCache(identifier string) { + panic("not implemented") +} + +func (f *fakeRing) GetTokenRangesForInstance(instanceID string) (ring.TokenRanges, error) { + panic("not implemented") +} diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index bc681e8faf02a..6d0ca34856789 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -93,7 +93,7 @@ func TestInstancePushQuery(t *testing.T) { t.Run("test count_over_time samples", func(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - inst, err := newInstance("foo", log.NewNopLogger()) + inst, err := newInstance("foo", log.NewNopLogger(), nil) require.NoError(t, err) err = inst.Push(context.Background(), &push.PushRequest{ @@ -185,7 +185,7 @@ func TestInstancePushQuery(t *testing.T) { t.Run("test bytes_over_time samples", func(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - inst, err := newInstance("foo", log.NewNopLogger()) + inst, err := newInstance("foo", log.NewNopLogger(), nil) require.NoError(t, err) err = inst.Push(context.Background(), &push.PushRequest{ diff --git a/pkg/pattern/instance_test.go b/pkg/pattern/instance_test.go index 7edd5364e6f12..e40d512f969ae 100644 --- a/pkg/pattern/instance_test.go +++ b/pkg/pattern/instance_test.go @@ -28,7 +28,7 @@ func TestInstance_QuerySample(t *testing.T) { Step: oneMin, } - instance, err := newInstance("test", log.NewNopLogger()) + instance, err := newInstance("test", log.NewNopLogger(), nil) require.NoError(t, err) labels := model.LabelSet{ diff --git a/pkg/pattern/metric/evaluator.go b/pkg/pattern/metric/evaluator.go index a1e301f31276b..86a4c83870b97 100644 --- a/pkg/pattern/metric/evaluator.go +++ b/pkg/pattern/metric/evaluator.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/pattern/iter" + "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -48,9 +49,9 @@ func ExtractMetricType(expr syntax.SampleExpr) (MetricType, error) { } type SampleEvaluatorFactory interface { - // NewStepEvaluator returns a NewStepEvaluator for a given SampleExpr. - // It's explicitly passed another NewStepEvaluator - // in order to enable arbitrary computation of embedded expressions. This allows more modular & extensible + // NewStepEvaluator returns a NewStepEvaluator for a given SampleExpr. + // It's explicitly passed another NewStepEvaluator + // in order to enable arbitrary computation of embedded expressions. This allows more modular & extensible // NewStepEvaluator implementations which can be composed. NewStepEvaluator( ctx context.Context, @@ -161,7 +162,7 @@ func (ev *DefaultEvaluatorFactory) NewStepEvaluator( } } -// Need to create our own StepEvaluator since we only support bytes and count over time, +// Need to create our own StepEvaluator since we only support bytes and count over time, // and always sum to get those values. In order to accomplish this we need control over the // aggregation operation.. func NewPatternSampleRangeAggEvaluator( @@ -199,7 +200,7 @@ func newRangeVectorIterator( // TODO(twhitney): do I need a streaming aggregator? // if so the aggregator is going to make this // a bit of a bad time, as there's currently no - // way to provide a custom one. + // way to provide a custom one. // // var overlap bool // if selRange >= step && start != end { @@ -222,7 +223,7 @@ func newRangeVectorIterator( // }, nil // } - // always sum + // always sum aggregator := logql.BatchRangeVectorAggregator(func(samples []promql.FPoint) float64 { sum := 0.0 for _, v := range samples { @@ -263,7 +264,6 @@ func (s *SeriesToSampleIterator) Next() bool { current, rest := s.floats[0], s.floats[1:] - //Is timestamp the correct unit here? s.curTs = current.T s.cur = current.F @@ -352,3 +352,7 @@ func (p *paramCompat) GetExpression() syntax.Expr { func (p *paramCompat) GetStoreChunks() *logproto.ChunkRefGroup { return nil } + +func (p *paramCompat) CachingOptions() (res resultscache.CachingOptions) { + return +} diff --git a/pkg/pattern/ring_client.go b/pkg/pattern/ring_client.go index 3ceaf481a3b9b..d1421b842422f 100644 --- a/pkg/pattern/ring_client.go +++ b/pkg/pattern/ring_client.go @@ -13,7 +13,13 @@ import ( "github.com/grafana/loki/v3/pkg/pattern/clientpool" ) -type RingClient struct { +type RingClient interface { + services.Service + Ring() ring.ReadRing + Pool() *ring_client.Pool +} + +type ringClient struct { cfg Config logger log.Logger @@ -29,10 +35,10 @@ func NewRingClient( metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, -) (*RingClient, error) { +) (RingClient, error) { var err error registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) - ringClient := &RingClient{ + ringClient := &ringClient{ logger: log.With(logger, "component", "pattern-ring-client"), cfg: cfg, } @@ -59,19 +65,81 @@ func NewRingClient( return ringClient, nil } -func (q *RingClient) starting(ctx context.Context) error { - return services.StartManagerAndAwaitHealthy(ctx, q.subservices) +func (r *ringClient) starting(ctx context.Context) error { + return services.StartManagerAndAwaitHealthy(ctx, r.subservices) } -func (q *RingClient) running(ctx context.Context) error { +func (r *ringClient) running(ctx context.Context) error { select { case <-ctx.Done(): return nil - case err := <-q.subservicesWatcher.Chan(): + case err := <-r.subservicesWatcher.Chan(): return fmt.Errorf("pattern tee subservices failed: %w", err) } } -func (q *RingClient) stopping(_ error) error { - return services.StopManagerAndAwaitStopped(context.Background(), q.subservices) +func (r *ringClient) stopping(_ error) error { + return services.StopManagerAndAwaitStopped(context.Background(), r.subservices) +} + +func (r *ringClient) Ring() ring.ReadRing { + return r.ring +} + +func (r *ringClient) Pool() *ring_client.Pool { + return r.pool +} + +// StartAsync starts Service asynchronously. Service must be in New State, otherwise error is returned. +// Context is used as a parent context for service own context. +func (r *ringClient) StartAsync(ctx context.Context) error { + return r.StartAsync(ctx) +} + +// AwaitRunning waits until service gets into Running state. +// If service is in New or Starting state, this method is blocking. +// If service is already in Running state, returns immediately with no error. +// If service is in a state, from which it cannot get into Running state, error is returned immediately. +func (r *ringClient) AwaitRunning(ctx context.Context) error { + return r.AwaitRunning(ctx) +} + +// StopAsync tell the service to stop. This method doesn't block and can be called multiple times. +// If Service is New, it is Terminated without having been started nor stopped. +// If Service is in Starting or Running state, this initiates shutdown and returns immediately. +// If Service has already been stopped, this method returns immediately, without taking action. +func (r *ringClient) StopAsync() { + r.StopAsync() +} + +// AwaitTerminated waits for the service to reach Terminated or Failed state. If service is already in one of these states, +// when method is called, method returns immediately. +// If service enters Terminated state, this method returns nil. +// If service enters Failed state, or context is finished before reaching Terminated or Failed, error is returned. +func (r *ringClient) AwaitTerminated(ctx context.Context) error { + return r.AwaitTerminated(ctx) +} + +// FailureCase returns error if Service is in Failed state. +// If Service is not in Failed state, this method returns nil. +func (r *ringClient) FailureCase() error { + return r.FailureCase() +} + +// State returns current state of the service. +func (r *ringClient) State() services.State { + return r.State() +} + +// AddListener adds listener to this service. Listener will be notified on subsequent state transitions +// of the service. Previous state transitions are not replayed, so it is suggested to add listeners before +// service is started. +// +// AddListener guarantees execution ordering across calls to a given listener but not across calls to +// multiple listeners. Specifically, a given listener will have its callbacks invoked in the same order +// as the service enters those states. Additionally, at most one of the listener's callbacks will execute +// at once. However, multiple listeners' callbacks may execute concurrently, and listeners may execute +// in an order different from the one in which they were registered. +func (r *ringClient) AddListener(listener services.Listener) { + r.AddListener(listener) } diff --git a/pkg/pattern/tee.go b/pkg/pattern/tee.go index 70fb37e1b6929..ed90d0dd478c9 100644 --- a/pkg/pattern/tee.go +++ b/pkg/pattern/tee.go @@ -18,14 +18,14 @@ import ( type Tee struct { cfg Config logger log.Logger - ringClient *RingClient + ringClient RingClient ingesterAppends *prometheus.CounterVec } func NewTee( cfg Config, - ringClient *RingClient, + ringClient RingClient, metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, @@ -38,8 +38,8 @@ func NewTee( Name: "pattern_ingester_appends_total", Help: "The total number of batch appends sent to pattern ingesters.", }, []string{"ingester", "status"}), - cfg: cfg, - ringClient: ringClient, + cfg: cfg, + ringClient: ringClient, } return t, nil @@ -58,7 +58,7 @@ func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { var descs [1]ring.InstanceDesc - replicationSet, err := t.ringClient.ring.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) + replicationSet, err := t.ringClient.Ring().Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) if err != nil { return err } @@ -66,7 +66,7 @@ func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { return errors.New("no instances found") } addr := replicationSet.Instances[0].Addr - client, err := t.ringClient.pool.GetClientFor(addr) + client, err := t.ringClient.Pool().GetClientFor(addr) if err != nil { return err } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f2fe80566b461..bfe023b90b569 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -34,6 +34,7 @@ import ( logql_log "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" + "github.com/grafana/loki/v3/pkg/pattern" querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/storage" @@ -1047,6 +1048,12 @@ func (q *SingleTenantQuerier) Patterns(ctx context.Context, req *logproto.QueryP return nil, httpgrpc.Errorf(http.StatusNotFound, "") } res, err := q.patternQuerier.Patterns(ctx, req) + if err != nil { + if errors.Is(err, pattern.ErrParseQuery) { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + } + return res, err }