Skip to content

Commit

Permalink
Disable calling new index-gateway client's API. (#6025)
Browse files Browse the repository at this point in the history
* Disable calling new index-gateway client's API.

* Remove infinite recursive call

* Correct mock setup for test
  • Loading branch information
kavirajk committed Apr 26, 2022
1 parent bcb8009 commit 0cc3fe7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 68 deletions.
71 changes: 4 additions & 67 deletions pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ package series
import (
"context"

"github.com/gogo/status"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
)

Expand All @@ -33,78 +30,18 @@ func NewIndexGatewayClientStore(client IndexGatewayClient, index *IndexStore) *I
}

func (c *IndexGatewayClientStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
response, err := c.client.GetChunkRef(ctx, &indexgatewaypb.GetChunkRefRequest{
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: allMatchers}).String(),
})
if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...)
}
return nil, err
}
result := make([]logproto.ChunkRef, len(response.Refs))
for i, ref := range response.Refs {
result[i] = *ref
}

return result, nil
return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...)
}

func (c *IndexGatewayClientStore) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
refs, err := c.GetChunkRefs(ctx, userID, from, through, matchers...)
if err != nil {
return nil, err
}
return c.chunksToSeries(ctx, refs, matchers)
return c.IndexStore.GetSeries(ctx, userID, from, through, matchers...)
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
resp, err := c.client.LabelNamesForMetricName(ctx, &indexgatewaypb.LabelNamesForMetricNameRequest{
MetricName: metricName,
From: from,
Through: through,
})
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName)
}
if err != nil {
return nil, err
}
return resp.Values, nil
return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName)
}

func (c *IndexGatewayClientStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
resp, err := c.client.LabelValuesForMetricName(ctx, &indexgatewaypb.LabelValuesForMetricNameRequest{
MetricName: metricName,
LabelName: labelName,
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...)
}
if err != nil {
return nil, err
}
return resp.Values, nil
}

// isUnimplementedCallError tells if the GRPC error is a gRPC error with code Unimplemented.
func isUnimplementedCallError(err error) bool {
if err == nil {
return false
}

s, ok := status.FromError(err)
if !ok {
return false
}
return (s.Code() == codes.Unimplemented)
return c.IndexStore.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName)
}
17 changes: 16 additions & 1 deletion pkg/storage/stores/series/series_index_gateway_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,28 @@ func (fakeClient) GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRe
}

func Test_IndexGatewayClient(t *testing.T) {
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
{From: config.DayTime{Time: model.Now().Add(-24 * time.Hour)}, Schema: "v12", RowShards: 16},
},
}
schema, err := index.CreateSchema(schemaCfg.Configs[0])
require.NoError(t, err)
testutils.ResetMockStorage()
tm, err := index.NewTableManager(index.TableManagerConfig{}, schemaCfg, 2*time.Hour, testutils.NewMockStorage(), nil, nil, nil)
require.NoError(t, err)
require.NoError(t, tm.SyncTables(context.Background()))

idx := IndexGatewayClientStore{
client: fakeClient{},
IndexStore: &IndexStore{
chunkBatchSize: 1,
schema: schema,
schemaCfg: schemaCfg,
index: testutils.NewMockStorage(),
},
}
_, err := idx.GetSeries(context.Background(), "foo", model.Earliest, model.Latest)
_, err = idx.GetSeries(context.Background(), "foo", model.Now(), model.Now().Add(1*time.Hour), labels.MustNewMatcher(labels.MatchEqual, "__name__", "logs"))
require.NoError(t, err)
}

Expand Down

0 comments on commit 0cc3fe7

Please sign in to comment.