Skip to content

Commit

Permalink
Simplify Thanos Query Get Stores (#926)
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv authored and bwplotka committed Mar 15, 2019
1 parent 7fab732 commit 51ff267
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 40 deletions.
4 changes: 1 addition & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,7 @@ func runQuery(
},
dialOpts,
)
proxy = store.NewProxyStore(logger, func(context.Context) ([]store.Client, error) {
return stores.Get(), nil
}, component.Query, selectorLset)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
19 changes: 4 additions & 15 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Client interface {
// ProxyStore implements the store API that proxies request to all given underlying stores.
type ProxyStore struct {
logger log.Logger
stores func(context.Context) ([]Client, error)
stores func() []Client
component component.StoreAPI
selectorLabels labels.Labels
}
Expand All @@ -46,7 +46,7 @@ type ProxyStore struct {
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL)
func NewProxyStore(
logger log.Logger,
stores func(context.Context) ([]Client, error),
stores func() []Client,
component component.StoreAPI,
selectorLabels labels.Labels,
) *ProxyStore {
Expand Down Expand Up @@ -109,13 +109,6 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
return nil
}

stores, err := s.stores(srv.Context())
if err != nil {
err = errors.Wrap(err, "failed to get store APIs")
level.Error(s.logger).Log("err", err)
return status.Errorf(codes.Unknown, err.Error())
}

var (
g, gctx = errgroup.WithContext(srv.Context())

Expand Down Expand Up @@ -144,7 +137,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
closeFn()
}()

for _, st := range stores {
for _, st := range s.stores() {
// We might be able to skip the store if its meta information indicates
// it cannot have series matching our query.
// NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error.
Expand Down Expand Up @@ -337,11 +330,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
g, gctx = errgroup.WithContext(ctx)
)

stores, err := s.stores(ctx)
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}
for _, st := range stores {
for _, st := range s.stores() {
store := st
g.Go(func() error {
resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{
Expand Down
27 changes: 5 additions & 22 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,14 @@ func (c *testClient) String() string {
return "test"
}

func TestProxyStore_Series_StoresFetchFail(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

q := NewProxyStore(nil,
func(_ context.Context) ([]Client, error) { return nil, errors.New("Fail") },
component.Query,
nil,
)

s := newStoreSeriesServer(context.Background())
testutil.NotOk(t, q.Series(&storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "a", Value: "a", Type: storepb.LabelMatcher_EQ}},
}, s))
}

func TestProxyStore_Info(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

q := NewProxyStore(nil,
func(context.Context) ([]Client, error) { return nil, nil },
func() []Client { return nil },
component.Query,
nil,
)
Expand Down Expand Up @@ -419,7 +402,7 @@ func TestProxyStore_Series(t *testing.T) {
} {
if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(nil,
func(_ context.Context) ([]Client, error) { return tc.storeAPIs, nil }, // what if err?
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
)
Expand Down Expand Up @@ -460,7 +443,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
},
}
q := NewProxyStore(nil,
func(context.Context) ([]Client, error) { return cls, nil },
func() []Client { return cls },
component.Query,
nil,
)
Expand Down Expand Up @@ -518,7 +501,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
}

q := NewProxyStore(nil,
func(context.Context) ([]Client, error) { return cls, nil },
func() []Client { return cls },
component.Query,
tlabels.FromStrings("fed", "a"),
)
Expand Down Expand Up @@ -555,7 +538,7 @@ func TestProxyStore_LabelValues(t *testing.T) {
}},
}
q := NewProxyStore(nil,
func(context.Context) ([]Client, error) { return cls, nil },
func() []Client { return cls },
component.Query,
nil,
)
Expand Down

0 comments on commit 51ff267

Please sign in to comment.