From 47b3613991be4445f3054f35b6efdb36695dc956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 10 Mar 2020 17:02:38 +0200 Subject: [PATCH 1/2] query: add --store-strict flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new flag called `--store-strict` as agreed per https://thanos.io/proposals/202001_thanos_query_health_handling.md/ I have updated the proposal to reflect the reality. Third time's the charm, I believe it :-) Now the flag is called `--store-strict` which only accepts statically defined nodes. I guess the code is even simpler now. I have also fixed one small issue where `%w` was used in `errors.Errorf`. Couldn't compile Thanos locally with Go 1.14 without this fix. Signed-off-by: Giedrius Statkevičius --- CHANGELOG.md | 1 + cmd/thanos/query.go | 19 +++- docs/components/query.md | 5 + .../202001_thanos_query_health_handling.md | 11 ++- pkg/discovery/dns/provider.go | 27 ++++-- pkg/discovery/dns/provider_test.go | 32 +++++++ pkg/query/storeset.go | 57 ++++++++---- pkg/query/storeset_test.go | 92 ++++++++++++++++++- pkg/store/postings_codec.go | 2 +- 9 files changed, 208 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fbfe3cf048..6714e69e8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added +- [#2252](https://github.com/thanos-io/thanos/pull/2252) Query: add new `--store.strict-mode` flag. More information available [here](/docs/proposals/202001_thanos_query_health_handling.md). - [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled. - [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks. - [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process. diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index ed266904e7..4a352f35aa 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -78,6 +78,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). PlaceHolder("").Strings() + strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). + PlaceHolder("").Strings() + fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable)."). PlaceHolder("").Strings() @@ -162,6 +165,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { *dnsSDResolver, time.Duration(*unhealthyStoreTimeout), time.Duration(*instantDefaultMaxSourceResolution), + *strictStores, component.Query, ) } @@ -202,6 +206,7 @@ func runQuery( dnsSDResolver string, unhealthyStoreTimeout time.Duration, instantDefaultMaxSourceResolution time.Duration, + strictStores []string, comp component.Component, ) error { // TODO(bplotka in PR #513 review): Move arguments into struct. @@ -222,14 +227,24 @@ func runQuery( dns.ResolverType(dnsSDResolver), ) + for _, store := range strictStores { + if dns.IsDynamicNode(store) { + return errors.Errorf("%s is a dynamically specified store i.e. it uses SD and that is not permitted under strict mode. Use --store for this", store) + } + } + var ( stores = query.NewStoreSet( logger, reg, func() (specs []query.StoreSpec) { - // Add DNS resolved addresses from static flags and file SD. + // Add DNS resolved addresses. for _, addr := range dnsProvider.Addresses() { - specs = append(specs, query.NewGRPCStoreSpec(addr)) + specs = append(specs, query.NewGRPCStoreSpec(addr, false)) + } + // Add strict & static nodes. + for _, addr := range strictStores { + specs = append(specs, query.NewGRPCStoreSpec(addr, true)) } specs = removeDuplicateStoreSpecs(logger, duplicatedStores, specs) diff --git a/docs/components/query.md b/docs/components/query.md index 96f12e3217..563fd5ce4b 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -327,6 +327,11 @@ Flags: prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups. + --store-strict= ... + Addresses of only statically configured store + API servers that are always used, even if the + health check fails. Useful if you have a + caching layer on top. --store.sd-files= ... Path to files that contain addresses of store API servers. The path can be a glob pattern diff --git a/docs/proposals/202001_thanos_query_health_handling.md b/docs/proposals/202001_thanos_query_health_handling.md index 3782d4b6cb..a07b19b2a4 100644 --- a/docs/proposals/202001_thanos_query_health_handling.md +++ b/docs/proposals/202001_thanos_query_health_handling.md @@ -2,7 +2,7 @@ title: Thanos Query store nodes healthiness handling type: proposal menu: proposals -status: accepted +status: complete owner: GiedriusS --- @@ -35,6 +35,7 @@ Thus, this logic needs to be changed somehow. There are a few possible options: 2. Another option could be introduced such as `--store.hold-timeout` which would be `--store.unhealthy-timeout`'s brother and we would hold the StoreAPI nodes for `max(hold_timeout, unhealthy_timeout)`. 3. Another option such as `--store.strict-mode` could be introduced which means that we would always retain the last information of the StoreAPI nodes of the last successful check. 4. The StoreAPI node specification format that is used in `--store` could be extended to include another flag which would let specify the previous option per-specific node. +5. Instead of extending the specification format, we could move the same inforamtion to the command line options themselves. This would increase the explicitness of this new mode i.e. that it only applies to statically defined nodes. Lets look through their pros and cons: @@ -47,10 +48,10 @@ If we were to graph these choices in terms of their incisiveness and complexity ```text Most incisive / Least Complex ------------ Least incisive / Most Complex #1 #2 #4 - #3 + #3 #5 ``` -After careful consideration and with the rationale in this proposal, we have decided to go with the third option. It should provide a sweet spot between being too invasive and providing our users the ability to fall-back to the old behavior. +After careful consideration and with the rationale in this proposal, we have decided to go with the fifth option. It should provide a sweet spot between being too invasive and providing our users the ability to fall-back to the old behavior. ## Goals @@ -77,7 +78,7 @@ The way this will need to be done should be as generic as possible so the design ## Proposal -* Add a new flag to Thanos Query `--store.strict-mode` which will make it always retain the last successfully retrieved information via the `Info()` gRPC method of **statically** defined nodes and thus always consider them part of the active store set. +* Add a new flag to Thanos Query `--store-strict` which will only accept statically specified nodes and Thanos Query will always retain the last successfully retrieved information of them via the `Info()` gRPC method. Thus, they will always be considered as part of the active store set. ## Risk @@ -85,7 +86,7 @@ The way this will need to be done should be as generic as possible so the design ## Work Plan -* Implement the new flag `--store.strict-mode` in Thanos Query which will make it keep around statically defined nodes. It will be disabled by default to reduce surprises when upgrading. +* Implement the new flag `--store-strict` in Thanos Query which will only accept statically defined nodes that will be permanently kept around. It is optional to use so there will be no surprises when upgrading. * Implement tests with dummy store nodes. * Document the new behavior. diff --git a/pkg/discovery/dns/provider.go b/pkg/discovery/dns/provider.go index 5df9e97320..0ef944574d 100644 --- a/pkg/discovery/dns/provider.go +++ b/pkg/discovery/dns/provider.go @@ -87,6 +87,23 @@ func (p *Provider) Clone() *Provider { } } +// IsDynamicNode returns if the specified StoreAPI addr uses +// any kind of SD mechanism. +func IsDynamicNode(addr string) bool { + qtype, _ := GetQTypeName(addr) + return qtype != "" +} + +// GetQTypeName splits the provided addr into two parts: the QType (if any) +// and the name. +func GetQTypeName(addr string) (qtype string, name string) { + qtypeAndName := strings.SplitN(addr, "+", 2) + if len(qtypeAndName) != 2 { + return "", addr + } + return qtypeAndName[0], qtypeAndName[1] +} + // Resolve stores a list of provided addresses or their DNS records if requested. // Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV). // defaultPort is used for non-SRV records when a port is not supplied. @@ -100,14 +117,12 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) { resolvedAddrs := map[string][]string{} for _, addr := range addrs { var resolved []string - qtypeAndName := strings.SplitN(addr, "+", 2) - if len(qtypeAndName) != 2 { - // No lookup specified. Add to results and continue to the next address. - resolvedAddrs[addr] = []string{addr} - p.resolverAddrs.WithLabelValues(addr).Set(1.0) + qtype, name := GetQTypeName(addr) + if qtype == "" { + resolvedAddrs[name] = []string{name} + p.resolverAddrs.WithLabelValues(name).Set(1.0) continue } - qtype, name := qtypeAndName[0], qtypeAndName[1] resolved, err := p.resolver.Resolve(ctx, name, QType(qtype)) p.resolverLookupsCount.Inc() diff --git a/pkg/discovery/dns/provider_test.go b/pkg/discovery/dns/provider_test.go index 41f4e86cc5..585a7afb22 100644 --- a/pkg/discovery/dns/provider_test.go +++ b/pkg/discovery/dns/provider_test.go @@ -102,3 +102,35 @@ func (d *mockResolver) Resolve(_ context.Context, name string, _ QType) ([]strin } return d.res[name], nil } + +// TestIsDynamicNode tests whether we properly catch dynamically defined nodes. +func TestIsDynamicNode(t *testing.T) { + for _, tcase := range []struct { + node string + isDynamic bool + }{ + { + node: "1.2.3.4", + isDynamic: false, + }, + { + node: "gibberish+1.1.1.1+noa", + isDynamic: true, + }, + { + node: "", + isDynamic: false, + }, + { + node: "dns+aaa", + isDynamic: true, + }, + { + node: "dnssrv+asdasdsa", + isDynamic: true, + }, + } { + isDynamic := IsDynamicNode(tcase.node) + testutil.Equals(t, tcase.isDynamic, isDynamic, "mismatch between results") + } +} diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index 978a3485ed..5a4b52c4b4 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -36,6 +36,8 @@ type StoreSpec interface { // NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage // given store connection. Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, storeType component.StoreAPI, err error) + // StrictStatic returns true if the StoreAPI has been statically defined and it is under a strict mode. + StrictStatic() bool } type StoreStatus struct { @@ -49,13 +51,19 @@ type StoreStatus struct { } type grpcStoreSpec struct { - addr string + addr string + strictstatic bool } // NewGRPCStoreSpec creates store pure gRPC spec. // It uses Info gRPC call to get Metadata. -func NewGRPCStoreSpec(addr string) StoreSpec { - return &grpcStoreSpec{addr: addr} +func NewGRPCStoreSpec(addr string, strictstatic bool) StoreSpec { + return &grpcStoreSpec{addr: addr, strictstatic: strictstatic} +} + +// StrictStatic returns true if the StoreAPI has been statically defined and it is under a strict mode. +func (s *grpcStoreSpec) StrictStatic() bool { + return s.strictstatic } func (s *grpcStoreSpec) Addr() string { @@ -320,7 +328,7 @@ func newStoreAPIStats() map[component.StoreAPI]map[string]int { } // Update updates the store set. It fetches current list of store specs from function and updates the fresh metadata -// from all stores. +// from all stores. Keeps around statically defined nodes that were defined with the strict mode. func (s *StoreSet) Update(ctx context.Context) { s.updateMtx.Lock() defer s.updateMtx.Unlock() @@ -334,14 +342,14 @@ func (s *StoreSet) Update(ctx context.Context) { level.Debug(s.logger).Log("msg", "starting updating storeAPIs", "cachedStores", len(stores)) - healthyStores := s.getHealthyStores(ctx, stores) - level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "healthyStores", len(healthyStores), "cachedStores", len(stores)) + activeStores := s.getActiveStores(ctx, stores) + level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "activeStores", len(activeStores), "cachedStores", len(stores)) stats := newStoreAPIStats() - // Close stores that where not healthy this time (are not in healthy stores map). + // Close stores that where not active this time (are not in active stores map). for addr, st := range stores { - if _, ok := healthyStores[addr]; ok { + if _, ok := activeStores[addr]; ok { stats[st.StoreType()][st.LabelSetsString()]++ continue } @@ -353,7 +361,7 @@ func (s *StoreSet) Update(ctx context.Context) { } // Add stores that are not yet in stores. - for addr, st := range healthyStores { + for addr, st := range activeStores { if _, ok := stores[addr]; ok { continue } @@ -384,15 +392,15 @@ func (s *StoreSet) Update(ctx context.Context) { s.cleanUpStoreStatuses(stores) } -func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef { +func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef { var ( - unique = make(map[string]struct{}) - healthyStores = make(map[string]*storeRef, len(stores)) - mtx sync.Mutex - wg sync.WaitGroup + unique = make(map[string]struct{}) + activeStores = make(map[string]*storeRef, len(stores)) + mtx sync.Mutex + wg sync.WaitGroup ) - // Gather healthy stores map concurrently. Build new store if does not exist already. + // Gather active stores map concurrently. Build new store if does not exist already. for _, storeSpec := range s.storeSpecs() { if _, ok := unique[storeSpec.Addr()]; ok { level.Warn(s.logger).Log("msg", "duplicated address in store nodes", "address", storeSpec.Addr()) @@ -411,7 +419,7 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor st, seenAlready := stores[addr] if !seenAlready { - // New store or was unhealthy and was removed in the past - create new one. + // New store or was unactive and was removed in the past - create new one. conn, err := grpc.DialContext(ctx, addr, s.dialOpts...) if err != nil { s.updateStoreStatus(&storeRef{addr: addr}, err) @@ -425,25 +433,36 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient) if err != nil { if !seenAlready { - // Close only if new. Unhealthy `s.stores` will be closed later on. + // Close only if new. Unactive `s.stores` will be closed later on. st.Close() } s.updateStoreStatus(st, err) level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr) + + if !spec.StrictStatic() { + return + } + + // Still keep it around if static & strict mode enabled. + mtx.Lock() + defer mtx.Unlock() + + activeStores[addr] = st return } + s.updateStoreStatus(st, nil) st.Update(labelSets, minTime, maxTime, storeType) mtx.Lock() defer mtx.Unlock() - healthyStores[addr] = st + activeStores[addr] = st }(storeSpec) } wg.Wait() - return healthyStores + return activeStores } func (s *StoreSet) updateStoreStatus(store *storeRef, err error) { diff --git a/pkg/query/storeset_test.go b/pkg/query/storeset_test.go index b3dd853dd5..ed367fd7c5 100644 --- a/pkg/query/storeset_test.go +++ b/pkg/query/storeset_test.go @@ -51,8 +51,9 @@ func (s *testStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque } type testStoreMeta struct { - extlsetFn func(addr string) []storepb.LabelSet - storeType component.StoreAPI + extlsetFn func(addr string) []storepb.LabelSet + storeType component.StoreAPI + minTime, maxTime int64 } type testStores struct { @@ -78,6 +79,8 @@ func startTestStores(storeMetas []testStoreMeta) (*testStores, error) { storeSrv := &testStore{ info: storepb.InfoResponse{ LabelSets: meta.extlsetFn(listener.Addr().String()), + MaxTime: meta.maxTime, + MinTime: meta.minTime, }, } if meta.storeType != nil { @@ -181,7 +184,7 @@ func TestStoreSet_Update(t *testing.T) { discoveredStoreAddr = append(discoveredStoreAddr, discoveredStoreAddr[0]) storeSet := NewStoreSet(nil, nil, func() (specs []StoreSpec) { for _, addr := range discoveredStoreAddr { - specs = append(specs, NewGRPCStoreSpec(addr)) + specs = append(specs, NewGRPCStoreSpec(addr, false)) } return specs }, testGRPCOpts, time.Minute) @@ -523,7 +526,7 @@ func TestStoreSet_Update_NoneAvailable(t *testing.T) { storeSet := NewStoreSet(nil, nil, func() (specs []StoreSpec) { for _, addr := range initialStoreAddr { - specs = append(specs, NewGRPCStoreSpec(addr)) + specs = append(specs, NewGRPCStoreSpec(addr, false)) } return specs }, testGRPCOpts, time.Minute) @@ -532,10 +535,89 @@ func TestStoreSet_Update_NoneAvailable(t *testing.T) { // Should not matter how many of these we run. storeSet.Update(context.Background()) storeSet.Update(context.Background()) - testutil.Assert(t, len(storeSet.stores) == 0, "none of services should respond just fine, so we expect no client to be ready.") + testutil.Equals(t, 0, len(storeSet.stores), "none of services should respond just fine, so we expect no client to be ready.") // Leak test will ensure that we don't keep client connection around. expected := newStoreAPIStats() testutil.Equals(t, expected, storeSet.storesMetric.storeNodes) } + +// TestQuerierStrict tests what happens when the strict mode is enabled/disabled. +func TestQuerierStrict(t *testing.T) { + defer leaktest.CheckTimeout(t, 5*time.Second)() + + st, err := startTestStores([]testStoreMeta{ + { + minTime: 12345, + maxTime: 54321, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + { + Name: "addr", + Value: addr, + }, + }, + }, + } + }, + storeType: component.Sidecar, + }, + { + minTime: 66666, + maxTime: 77777, + extlsetFn: func(addr string) []storepb.LabelSet { + return []storepb.LabelSet{ + { + Labels: []storepb.Label{ + { + Name: "addr", + Value: addr, + }, + }, + }, + } + }, + storeType: component.Sidecar, + }, + }) + + testutil.Ok(t, err) + defer st.Close() + + staticStoreAddr := st.StoreAddresses()[0] + storeSet := NewStoreSet(nil, nil, func() (specs []StoreSpec) { + return []StoreSpec{ + NewGRPCStoreSpec(st.StoreAddresses()[0], true), + NewGRPCStoreSpec(st.StoreAddresses()[1], false), + } + }, testGRPCOpts, time.Minute) + defer storeSet.Close() + storeSet.gRPCInfoCallTimeout = 1 * time.Second + + // Initial update. + storeSet.Update(context.Background()) + testutil.Equals(t, 2, len(storeSet.stores), "two clients must be available for running store nodes") + + // The store is statically defined + strict mode is enabled + // so its client + information must be retained. + curMin, curMax := storeSet.stores[staticStoreAddr].minTime, storeSet.stores[staticStoreAddr].maxTime + testutil.Equals(t, int64(12345), curMin, "got incorrect minimum time") + testutil.Equals(t, int64(54321), curMax, "got incorrect minimum time") + + // Turn off the stores. + st.Close() + + // Update again many times. Should not matter WRT the static one. + storeSet.Update(context.Background()) + storeSet.Update(context.Background()) + storeSet.Update(context.Background()) + + // Check that the information is the same. + testutil.Equals(t, 1, len(storeSet.stores), "one client must remain available for a store node that is down") + testutil.Equals(t, curMin, storeSet.stores[staticStoreAddr].minTime, "minimum time reported by the store node is different") + testutil.Equals(t, curMax, storeSet.stores[staticStoreAddr].maxTime, "minimum time reported by the store node is different") + testutil.NotOk(t, storeSet.storeStatuses[staticStoreAddr].LastError) +} diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index 246d8bab81..7b1aaff477 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -80,7 +80,7 @@ func diffVarintSnappyDecode(input []byte) (index.Postings, error) { raw, err := snappy.Decode(nil, input[len(codecHeaderSnappy):]) if err != nil { - return nil, errors.Errorf("snappy decode: %w", err) + return nil, errors.Wrap(err, "snappy decode") } return newDiffVarintPostings(raw), nil From eb13b48de39260a2b401e5aa23ac44f97de8fab4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 30 Mar 2020 19:12:12 +0300 Subject: [PATCH 2/2] CHANGELOG: fix changelog item MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6714e69e8e..9e7ce10e91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added -- [#2252](https://github.com/thanos-io/thanos/pull/2252) Query: add new `--store.strict-mode` flag. More information available [here](/docs/proposals/202001_thanos_query_health_handling.md). +- [#2252](https://github.com/thanos-io/thanos/pull/2252) Query: add new `--store-strict` flag. More information available [here](/docs/proposals/202001_thanos_query_health_handling.md). - [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled. - [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks. - [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.