Skip to content

Commit

Permalink
tsdb index gateway (#6158)
Browse files Browse the repository at this point in the history
* enable new grpc calls on index gateway and add GetSeries call

* use index gateway for tsdb index store queries

* add missing file

* lint

* disable IndexGatewayClientStore for stores other than tsdb until we are ready to enable it again
  • Loading branch information
sandeepsukhani authored May 18, 2022
1 parent 4f87db7 commit 62f5e59
Show file tree
Hide file tree
Showing 11 changed files with 1,149 additions and 187 deletions.
65 changes: 21 additions & 44 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/series"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/deletion"
util_log "github.com/grafana/loki/pkg/util/log"
)

var (
Expand Down Expand Up @@ -200,58 +198,37 @@ func shouldUseBoltDBIndexGatewayClient(cfg Config) bool {
return true
}

func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, stores.Index, func(), error) {
func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, series.IndexStore, func(), error) {
indexClientReg := prometheus.WrapRegistererWith(
prometheus.Labels{"component": "index-store-" + p.From.String()}, s.registerer)

if p.IndexType == config.TSDBType {
var (
nodeName = s.cfg.TSDBShipperConfig.IngesterName
dir = s.cfg.TSDBShipperConfig.ActiveIndexDirectory
)
tsdbMetrics := tsdb.NewMetrics(indexClientReg)
objectClient, err := NewObjectClient(s.cfg.TSDBShipperConfig.SharedStoreType, s.cfg, s.clientMetrics)
if err != nil {
return nil, nil, nil, err
}

shpr, err := indexshipper.NewIndexShipper(
s.cfg.TSDBShipperConfig,
objectClient,
s.limits,
tsdb.OpenShippableTSDB,
)
// ToDo(Sandeep): Avoid initializing writer when in read only mode
writer, idx, err := tsdb.NewStore(s.cfg.TSDBShipperConfig, p, f, objectClient, s.limits, indexClientReg)
if err != nil {
return nil, nil, nil, err
}
tsdbManager := tsdb.NewTSDBManager(
nodeName,
dir,
shpr,
p.IndexTables.Period,
util_log.Logger,
tsdbMetrics,
)
// TODO(owen-d): Only need HeadManager
// on the ingester. Otherwise, the TSDBManager is sufficient
headManager := tsdb.NewHeadManager(
util_log.Logger,
dir,
tsdbMetrics,
tsdbManager,
)
if err := headManager.Start(); err != nil {
return nil, nil, nil, err
}
idx := tsdb.NewIndexClient(headManager, p)
writer := tsdb.NewChunkWriter(f, p, headManager)

// TODO(owen-d): add TSDB index-gateway support
// ToDo(Sandeep): Refactor code to not use boltdb-shipper index gateway client config
if shouldUseBoltDBIndexGatewayClient(s.cfg) {
// inject the index-gateway client into the index store
gw, err := shipper.NewGatewayClient(s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.logger)
if err != nil {
return nil, nil, nil, err
}
idx = series.NewIndexGatewayClientStore(gw, idx)
}

return writer, idx,
func() {
chunkClient.Stop()
f.Stop()
chunkClient.Stop()
objectClient.Stop()
}, nil
}

Expand All @@ -269,22 +246,22 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
}

var (
writer stores.ChunkWriter = series.NewWriter(f, s.schemaCfg, idx, schema, s.writeDedupeCache, s.storeCfg.DisableIndexDeduplication)
seriesdIndex *series.IndexStore = series.NewIndexStore(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize)
index stores.Index = seriesdIndex
writer stores.ChunkWriter = series.NewWriter(f, s.schemaCfg, idx, schema, s.writeDedupeCache, s.storeCfg.DisableIndexDeduplication)
indexStore = series.NewIndexStore(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize)
)

if shouldUseBoltDBIndexGatewayClient(s.cfg) {
// (Sandeep): Disable IndexGatewayClientStore for stores other than tsdb until we are ready to enable it again
/*if shouldUseBoltDBIndexGatewayClient(s.cfg) {
// inject the index-gateway client into the index store
gw, err := shipper.NewGatewayClient(s.cfg.BoltDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.logger)
if err != nil {
return nil, nil, nil, err
}
index = series.NewIndexGatewayClientStore(gw, seriesdIndex)
}
indexStore = series.NewIndexGatewayClientStore(gw, indexStore)
}*/

return writer,
index,
indexStore,
func() {
chunkClient.Stop()
f.Stop()
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/stores/series"
"github.com/grafana/loki/pkg/util"
)

Expand Down Expand Up @@ -44,7 +45,7 @@ func NewCompositeStore(limits StoreLimits) *CompositeStore {
return &CompositeStore{compositeStore{}, limits}
}

func (c *CompositeStore) AddStore(start model.Time, fetcher *fetcher.Fetcher, index Index, writer ChunkWriter, stop func()) {
func (c *CompositeStore) AddStore(start model.Time, fetcher *fetcher.Fetcher, index series.IndexStore, writer ChunkWriter, stop func()) {
c.stores = append(c.stores, compositeStoreEntry{
start: start,
Store: &storeEntry{
Expand Down
15 changes: 2 additions & 13 deletions pkg/storage/stores/composite_store_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/errors"
"github.com/grafana/loki/pkg/storage/stores/series"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
"github.com/grafana/loki/pkg/util/validation"
Expand All @@ -25,17 +25,6 @@ type StoreLimits interface {
MaxQueryLength(userID string) time.Duration
}

type Index interface {
GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error)
GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error)
LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error)
LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)
// SetChunkFilterer sets a chunk filter to be used when retrieving chunks.
// This is only used for GetSeries implementation.
// Todo we might want to pass it as a parameter to GetSeries instead.
SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)
}

type ChunkWriter interface {
Put(ctx context.Context, chunks []chunk.Chunk) error
PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error
Expand All @@ -50,7 +39,7 @@ type storeEntry struct {
limits StoreLimits
stop func()
fetcher *fetcher.Fetcher
index Index
index series.IndexStore
ChunkWriter
}

Expand Down
90 changes: 84 additions & 6 deletions pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,123 @@ package series
import (
"context"

"github.com/gogo/status"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
)

type IndexGatewayClientStore struct {
client IndexGatewayClient
*IndexStore
IndexStore
}

type IndexGatewayClient interface {
GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRefRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetChunkRefResponse, error)
GetSeries(ctx context.Context, in *indexgatewaypb.GetSeriesRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetSeriesResponse, error)
LabelNamesForMetricName(ctx context.Context, in *indexgatewaypb.LabelNamesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error)
LabelValuesForMetricName(ctx context.Context, in *indexgatewaypb.LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error)
}

func NewIndexGatewayClientStore(client IndexGatewayClient, index *IndexStore) *IndexGatewayClientStore {
func NewIndexGatewayClientStore(client IndexGatewayClient, index IndexStore) IndexStore {
return &IndexGatewayClientStore{
client: client,
IndexStore: index,
}
}

func (c *IndexGatewayClientStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...)
response, err := c.client.GetChunkRef(ctx, &indexgatewaypb.GetChunkRefRequest{
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: allMatchers}).String(),
})
if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...)
}
return nil, err
}
result := make([]logproto.ChunkRef, len(response.Refs))
for i, ref := range response.Refs {
result[i] = *ref
}

return result, nil
}

func (c *IndexGatewayClientStore) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
return c.IndexStore.GetSeries(ctx, userID, from, through, matchers...)
resp, err := c.client.GetSeries(ctx, &indexgatewaypb.GetSeriesRequest{
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.GetSeries(ctx, userID, from, through, matchers...)
}
return nil, err
}

result := make([]labels.Labels, len(resp.Series))
for i, s := range resp.Series {
result[i] = logproto.FromLabelAdaptersToLabels(s.Labels)
}

return result, nil
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName)
resp, err := c.client.LabelNamesForMetricName(ctx, &indexgatewaypb.LabelNamesForMetricNameRequest{
MetricName: metricName,
From: from,
Through: through,
})
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName)
}
if err != nil {
return nil, err
}
return resp.Values, nil
}

func (c *IndexGatewayClientStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
return c.IndexStore.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName)
resp, err := c.client.LabelValuesForMetricName(ctx, &indexgatewaypb.LabelValuesForMetricNameRequest{
MetricName: metricName,
LabelName: labelName,
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if isUnimplementedCallError(err) {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...)
}
if err != nil {
return nil, err
}
return resp.Values, nil
}

// isUnimplementedCallError tells if the GRPC error is a gRPC error with code Unimplemented.
func isUnimplementedCallError(err error) bool {
if err == nil {
return false
}

s, ok := status.FromError(err)
if !ok {
return false
}
return (s.Code() == codes.Unimplemented)
}
25 changes: 7 additions & 18 deletions pkg/storage/stores/series/series_index_gateway_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,18 @@ func (fakeClient) GetChunkRef(ctx context.Context, in *indexgatewaypb.GetChunkRe
return &indexgatewaypb.GetChunkRefResponse{}, nil
}

func Test_IndexGatewayClient(t *testing.T) {
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
{From: config.DayTime{Time: model.Now().Add(-24 * time.Hour)}, Schema: "v12", RowShards: 16},
},
}
schema, err := index.CreateSchema(schemaCfg.Configs[0])
require.NoError(t, err)
testutils.ResetMockStorage()
tm, err := index.NewTableManager(index.TableManagerConfig{}, schemaCfg, 2*time.Hour, testutils.NewMockStorage(), nil, nil, nil)
require.NoError(t, err)
require.NoError(t, tm.SyncTables(context.Background()))
func (fakeClient) GetSeries(ctx context.Context, in *indexgatewaypb.GetSeriesRequest, opts ...grpc.CallOption) (*indexgatewaypb.GetSeriesResponse, error) {
return &indexgatewaypb.GetSeriesResponse{}, nil
}

func Test_IndexGatewayClient(t *testing.T) {
idx := IndexGatewayClientStore{
client: fakeClient{},
IndexStore: &IndexStore{
IndexStore: &indexStore{
chunkBatchSize: 1,
schema: schema,
schemaCfg: schemaCfg,
index: testutils.NewMockStorage(),
},
}
_, err = idx.GetSeries(context.Background(), "foo", model.Now(), model.Now().Add(1*time.Hour), labels.MustNewMatcher(labels.MatchEqual, "__name__", "logs"))
_, err := idx.GetSeries(context.Background(), "foo", model.Earliest, model.Latest)
require.NoError(t, err)
}

Expand Down Expand Up @@ -106,7 +95,7 @@ func Test_IndexGatewayClient_Fallback(t *testing.T) {
require.NoError(t, tm.SyncTables(context.Background()))
idx := NewIndexGatewayClientStore(
indexgatewaypb.NewIndexGatewayClient(conn),
&IndexStore{
&indexStore{
chunkBatchSize: 1,
schema: schema,
schemaCfg: schemaCfg,
Expand Down
Loading

0 comments on commit 62f5e59

Please sign in to comment.