diff --git a/pkg/storage/stores/series/series_index_gateway_store.go b/pkg/storage/stores/series/series_index_gateway_store.go index 5243c537a578..9840dad6f200 100644 --- a/pkg/storage/stores/series/series_index_gateway_store.go +++ b/pkg/storage/stores/series/series_index_gateway_store.go @@ -3,14 +3,11 @@ package series import ( "context" - "github.com/gogo/status" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" ) @@ -33,78 +30,18 @@ func NewIndexGatewayClientStore(client IndexGatewayClient, index *IndexStore) *I } func (c *IndexGatewayClientStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]logproto.ChunkRef, error) { - response, err := c.client.GetChunkRef(ctx, &indexgatewaypb.GetChunkRefRequest{ - From: from, - Through: through, - Matchers: (&syntax.MatchersExpr{Mts: allMatchers}).String(), - }) - if err != nil { - if isUnimplementedCallError(err) { - // Handle communication with older index gateways gracefully, by falling back to the index store calls. - return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...) - } - return nil, err - } - result := make([]logproto.ChunkRef, len(response.Refs)) - for i, ref := range response.Refs { - result[i] = *ref - } - - return result, nil + return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...) } func (c *IndexGatewayClientStore) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { - refs, err := c.GetChunkRefs(ctx, userID, from, through, matchers...) - if err != nil { - return nil, err - } - return c.chunksToSeries(ctx, refs, matchers) + return c.IndexStore.GetSeries(ctx, userID, from, through, matchers...) } // LabelNamesForMetricName retrieves all label names for a metric name. func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { - resp, err := c.client.LabelNamesForMetricName(ctx, &indexgatewaypb.LabelNamesForMetricNameRequest{ - MetricName: metricName, - From: from, - Through: through, - }) - if isUnimplementedCallError(err) { - // Handle communication with older index gateways gracefully, by falling back to the index store calls. - return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName) - } - if err != nil { - return nil, err - } - return resp.Values, nil + return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName) } func (c *IndexGatewayClientStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { - resp, err := c.client.LabelValuesForMetricName(ctx, &indexgatewaypb.LabelValuesForMetricNameRequest{ - MetricName: metricName, - LabelName: labelName, - From: from, - Through: through, - Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(), - }) - if isUnimplementedCallError(err) { - // Handle communication with older index gateways gracefully, by falling back to the index store calls. - return c.IndexStore.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...) - } - if err != nil { - return nil, err - } - return resp.Values, nil -} - -// isUnimplementedCallError tells if the GRPC error is a gRPC error with code Unimplemented. -func isUnimplementedCallError(err error) bool { - if err == nil { - return false - } - - s, ok := status.FromError(err) - if !ok { - return false - } - return (s.Code() == codes.Unimplemented) + return c.IndexStore.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName) } diff --git a/pkg/storage/stores/series/series_index_gateway_store_test.go b/pkg/storage/stores/series/series_index_gateway_store_test.go index c1bbd65868ed..239f962ef017 100644 --- a/pkg/storage/stores/series/series_index_gateway_store_test.go +++ b/pkg/storage/stores/series/series_index_gateway_store_test.go @@ -28,13 +28,28 @@ func (fakeClient) GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRe } func Test_IndexGatewayClient(t *testing.T) { + schemaCfg := config.SchemaConfig{ + Configs: []config.PeriodConfig{ + {From: config.DayTime{Time: model.Now().Add(-24 * time.Hour)}, Schema: "v12", RowShards: 16}, + }, + } + schema, err := index.CreateSchema(schemaCfg.Configs[0]) + require.NoError(t, err) + testutils.ResetMockStorage() + tm, err := index.NewTableManager(index.TableManagerConfig{}, schemaCfg, 2*time.Hour, testutils.NewMockStorage(), nil, nil, nil) + require.NoError(t, err) + require.NoError(t, tm.SyncTables(context.Background())) + idx := IndexGatewayClientStore{ client: fakeClient{}, IndexStore: &IndexStore{ chunkBatchSize: 1, + schema: schema, + schemaCfg: schemaCfg, + index: testutils.NewMockStorage(), }, } - _, err := idx.GetSeries(context.Background(), "foo", model.Earliest, model.Latest) + _, err = idx.GetSeries(context.Background(), "foo", model.Now(), model.Now().Add(1*time.Hour), labels.MustNewMatcher(labels.MatchEqual, "__name__", "logs")) require.NoError(t, err) }