From 51ff2678532981b3e124016f0d9a071b04f0ea90 Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Fri, 15 Mar 2019 12:48:44 +0200
Subject: [PATCH] Simplify Thanos Query Get Stores (#926)
---
cmd/thanos/query.go | 4 +---
pkg/store/proxy.go | 19 ++++---------------
pkg/store/proxy_test.go | 27 +++++----------------------
3 files changed, 10 insertions(+), 40 deletions(-)
diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go
index a69ac0df1a..df86dbad0c 100644
--- a/cmd/thanos/query.go
+++ b/cmd/thanos/query.go
@@ -304,9 +304,7 @@ func runQuery(
},
dialOpts,
)
- proxy = store.NewProxyStore(logger, func(context.Context) ([]store.Client, error) {
- return stores.Get(), nil
- }, component.Query, selectorLset)
+ proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
engine = promql.NewEngine(
promql.EngineOpts{
diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go
index e9f3cfdc74..556bf77da6 100644
--- a/pkg/store/proxy.go
+++ b/pkg/store/proxy.go
@@ -37,7 +37,7 @@ type Client interface {
// ProxyStore implements the store API that proxies request to all given underlying stores.
type ProxyStore struct {
logger log.Logger
- stores func(context.Context) ([]Client, error)
+ stores func() []Client
component component.StoreAPI
selectorLabels labels.Labels
}
@@ -46,7 +46,7 @@ type ProxyStore struct {
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL)
func NewProxyStore(
logger log.Logger,
- stores func(context.Context) ([]Client, error),
+ stores func() []Client,
component component.StoreAPI,
selectorLabels labels.Labels,
) *ProxyStore {
@@ -109,13 +109,6 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
return nil
}
- stores, err := s.stores(srv.Context())
- if err != nil {
- err = errors.Wrap(err, "failed to get store APIs")
- level.Error(s.logger).Log("err", err)
- return status.Errorf(codes.Unknown, err.Error())
- }
-
var (
g, gctx = errgroup.WithContext(srv.Context())
@@ -144,7 +137,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
closeFn()
}()
- for _, st := range stores {
+ for _, st := range s.stores() {
// We might be able to skip the store if its meta information indicates
// it cannot have series matching our query.
// NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error.
@@ -337,11 +330,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
g, gctx = errgroup.WithContext(ctx)
)
- stores, err := s.stores(ctx)
- if err != nil {
- return nil, status.Errorf(codes.Unknown, err.Error())
- }
- for _, st := range stores {
+ for _, st := range s.stores() {
store := st
g.Go(func() error {
resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{
diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go
index db1819cebb..fa3e7e110a 100644
--- a/pkg/store/proxy_test.go
+++ b/pkg/store/proxy_test.go
@@ -42,23 +42,6 @@ func (c *testClient) String() string {
return "test"
}
-func TestProxyStore_Series_StoresFetchFail(t *testing.T) {
- defer leaktest.CheckTimeout(t, 10*time.Second)()
-
- q := NewProxyStore(nil,
- func(_ context.Context) ([]Client, error) { return nil, errors.New("Fail") },
- component.Query,
- nil,
- )
-
- s := newStoreSeriesServer(context.Background())
- testutil.NotOk(t, q.Series(&storepb.SeriesRequest{
- MinTime: 1,
- MaxTime: 300,
- Matchers: []storepb.LabelMatcher{{Name: "a", Value: "a", Type: storepb.LabelMatcher_EQ}},
- }, s))
-}
-
func TestProxyStore_Info(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()
@@ -66,7 +49,7 @@ func TestProxyStore_Info(t *testing.T) {
defer cancel()
q := NewProxyStore(nil,
- func(context.Context) ([]Client, error) { return nil, nil },
+ func() []Client { return nil },
component.Query,
nil,
)
@@ -419,7 +402,7 @@ func TestProxyStore_Series(t *testing.T) {
} {
if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(nil,
- func(_ context.Context) ([]Client, error) { return tc.storeAPIs, nil }, // what if err?
+ func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
)
@@ -460,7 +443,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
},
}
q := NewProxyStore(nil,
- func(context.Context) ([]Client, error) { return cls, nil },
+ func() []Client { return cls },
component.Query,
nil,
)
@@ -518,7 +501,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
}
q := NewProxyStore(nil,
- func(context.Context) ([]Client, error) { return cls, nil },
+ func() []Client { return cls },
component.Query,
tlabels.FromStrings("fed", "a"),
)
@@ -555,7 +538,7 @@ func TestProxyStore_LabelValues(t *testing.T) {
}},
}
q := NewProxyStore(nil,
- func(context.Context) ([]Client, error) { return cls, nil },
+ func() []Client { return cls },
component.Query,
nil,
)