diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 2ead4b7768e3..8864a03960bc 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -20,10 +20,10 @@ import ( ring_client "github.com/grafana/dskit/ring/client" - "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/pattern/clientpool" "github.com/grafana/loki/v3/pkg/pattern/drain" + "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/v3/pkg/util" util_log "github.com/grafana/loki/v3/pkg/util/log" ) diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index dbee330680af..2220a2ef41d8 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -149,7 +149,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ingester := ingester i := i g.Go(func() error { - client, err := q.ringClient.pool.GetClientFor(ingester.Addr) + client, err := q.ringClient.pool.GetClientFor(ingester.Addr) if err != nil { return err } diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index e5ecf9b1a84a..90b1845a90c3 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -13,8 +13,9 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/pattern/iter" - "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/pattern/drain" + + "github.com/grafana/loki/pkg/push" ) func TestInstancePushQuery(t *testing.T) { diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 940f014271fb..e19ba040ff71 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -151,10 +151,6 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream return nil, fmt.Errorf("failed to create stream: %w", err) } return s, nil - if err != nil { - return nil, fmt.Errorf("failed to create stream: %w", err) - } - return s, nil } func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint { diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index fefe4ee37cef..5f2bd9a42685 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -21,6 +21,7 @@ type stream struct { labelHash uint64 patterns *drain.Drain mtx sync.Mutex + logger log.Logger lastTs int64 } @@ -39,6 +40,7 @@ func newStream( labels: labels, labelsString: labels.String(), labelHash: labels.Hash(), + logger: logger, patterns: drain.New(drainCfg, guessedFormat, &drain.Metrics{ PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "false"), PatternsPrunedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "true"), diff --git a/pkg/querier-rf1/handler.go b/pkg/querier-rf1/handler.go index f7ade2b23721..92e1fa8e6402 100644 --- a/pkg/querier-rf1/handler.go +++ b/pkg/querier-rf1/handler.go @@ -131,14 +131,6 @@ func (h *Handler) Do(ctx context.Context, req queryrangebase.Request) (queryrang } return &queryrange.DetectedLabelsResponse{Response: result}, nil - case *logproto.QuerySamplesRequest: - result, err := h.api.SamplesHandler(ctx, concrete) - if err != nil { - return nil, err - } - return &queryrange.QuerySamplesResponse{ - Response: result, - }, nil default: return nil, fmt.Errorf("unsupported query type %T", req) } diff --git a/pkg/querier-rf1/http.go b/pkg/querier-rf1/http.go index 875647deb7e8..279d52bf9ccc 100644 --- a/pkg/querier-rf1/http.go +++ b/pkg/querier-rf1/http.go @@ -297,19 +297,6 @@ func (q *QuerierAPI) PatternsHandler(ctx context.Context, req *logproto.QueryPat return resp, nil } -func (q *QuerierAPI) SamplesHandler(ctx context.Context, req *logproto.QuerySamplesRequest) (*logproto.QuerySamplesResponse, error) { - resp, err := q.querier.SelectMetricSamples(ctx, req) - if err != nil { - return nil, err - } - if resp == nil { // Some stores don't implement this - return &logproto.QuerySamplesResponse{ - Series: []logproto.Series{}, - }, nil - } - return resp, nil -} - func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, expr syntax.Expr, limit uint32) error { tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { diff --git a/pkg/querier-rf1/querier.go b/pkg/querier-rf1/querier.go index 9504fe23482a..e5e4c2206275 100644 --- a/pkg/querier-rf1/querier.go +++ b/pkg/querier-rf1/querier.go @@ -77,7 +77,6 @@ type Querier interface { DetectedFields(ctx context.Context, req *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) - SelectMetricSamples(ctx context.Context, req *logproto.QuerySamplesRequest) (*logproto.QuerySamplesResponse, error) } type Limits querier_limits.Limits @@ -906,7 +905,6 @@ func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Str type PatterQuerier interface { Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) - Samples(ctx context.Context, req *logproto.QuerySamplesRequest) (*logproto.QuerySamplesResponse, error) } func (q *Rf1Querier) WithPatternQuerier(pq querier.PatterQuerier) { @@ -924,7 +922,3 @@ func (q *Rf1Querier) Patterns(ctx context.Context, req *logproto.QueryPatternsRe return res, err } - -func (q *Rf1Querier) SelectMetricSamples(_ context.Context, _ *logproto.QuerySamplesRequest) (*logproto.QuerySamplesResponse, error) { - return nil, httpgrpc.Errorf(http.StatusBadRequest, "not implmented") -} diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index e04aa0381022..cff1f67637a1 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -722,6 +722,8 @@ func (q *querierMock) DetectedLabels(ctx context.Context, req *logproto.Detected return resp.(*logproto.DetectedLabelsResponse), err } +func (q *querierMock) WithPatternQuerier(_ PatterQuerier) {} + type engineMock struct { util.ExtendedMock } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 0755858058d4..f553c61dafb6 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -289,7 +289,8 @@ func NewDetectedLabelsTripperware(cfg Config, logger log.Logger, l Limits, schem StatsCollectorMiddleware(), NewLimitsMiddleware(l), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, splitter, metrics.SplitByMetrics)} + SplitByIntervalMiddleware(schema.Configs, limits, merger, splitter, metrics.SplitByMetrics), + } if cfg.MaxRetries > 0 { queryRangeMiddleware = append(