From 3df327495c3070a8e80188e129aadb01ecb0a77a Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Thu, 23 Apr 2020 16:38:09 -0400 Subject: [PATCH] rebasing on master, re-runing go mod vendor seemed to bring in some more changes... Signed-off-by: Ed Welch --- .../cortexproject/cortex/pkg/api/api.go | 4 +- .../cortex/pkg/compactor/compactor_ring.go | 2 +- .../cortexproject/cortex/pkg/cortex/cortex.go | 2 +- .../cortex/pkg/cortex/modules.go | 23 +- .../cortex/pkg/ingester/flush.go | 4 - .../cortex/pkg/ingester/ingester.go | 7 + .../cortex/pkg/ingester/metrics.go | 10 - .../cortexproject/cortex/pkg/ingester/wal.go | 38 +- .../pkg/querier/blocks_store_balanced_set.go | 71 ++++ .../pkg/querier/blocks_store_queryable.go | 327 ++++++++++++++++++ .../querier/blocks_store_replicated_set.go | 150 ++++++++ .../cortex/pkg/querier/querier.go | 14 + .../pkg/querier/store_gateway_client.go | 38 +- .../cortex/pkg/ruler/ruler_ring.go | 2 +- .../ruler/rules/objectclient/rule_store.go | 4 +- .../cortex/pkg/storage/tsdb/config.go | 2 + .../cortex/pkg/storegateway/gateway_ring.go | 14 +- 17 files changed, 670 insertions(+), 42 deletions(-) create mode 100644 vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go create mode 100644 vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_queryable.go create mode 100644 vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go diff --git a/vendor/github.com/cortexproject/cortex/pkg/api/api.go b/vendor/github.com/cortexproject/cortex/pkg/api/api.go index ad392849d28a..7aeb6e16f338 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/api/api.go +++ b/vendor/github.com/cortexproject/cortex/pkg/api/api.go @@ -7,6 +7,8 @@ import ( "regexp" "strings" + "github.com/prometheus/client_golang/prometheus" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/common/route" @@ -177,7 +179,7 @@ func (a *API) RegisterIngester(i *ingester.Ingester, pushConfig distributor.Conf // match the Prometheus API but mirror it closely enough to justify their routing under the Prometheus // component/ func (a *API) RegisterPurger(store *purger.DeleteStore) { - deleteRequestHandler := purger.NewDeleteRequestHandler(store) + deleteRequestHandler := purger.NewDeleteRequestHandler(store, prometheus.DefaultRegisterer) a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.AddDeleteRequestHandler), true, "PUT", "POST") a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.GetAllDeleteRequestsHandler), true, "GET") diff --git a/vendor/github.com/cortexproject/cortex/pkg/compactor/compactor_ring.go b/vendor/github.com/cortexproject/cortex/pkg/compactor/compactor_ring.go index 6caa4bd501e7..4db5f2747883 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/compactor/compactor_ring.go +++ b/vendor/github.com/cortexproject/cortex/pkg/compactor/compactor_ring.go @@ -71,7 +71,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { // Configure lifecycler lc.RingConfig = rc - lc.ListenPort = &cfg.ListenPort + lc.ListenPort = cfg.ListenPort lc.Addr = cfg.InstanceAddr lc.Port = cfg.InstancePort lc.ID = cfg.InstanceID diff --git a/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go b/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go index d441967e1f52..e79b102b13c1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go +++ b/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go @@ -92,7 +92,7 @@ type Config struct { Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags. TSDB tsdb.Config `yaml:"tsdb"` Compactor compactor.Config `yaml:"compactor"` - StoreGateway storegateway.Config `yaml:"store_gateway" doc:"hidden"` // this component is not yet finished. + StoreGateway storegateway.Config `yaml:"store_gateway"` DataPurgerConfig purger.Config `yaml:"purger"` Ruler ruler.Config `yaml:"ruler"` diff --git a/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go b/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go index adaeb89b3487..3975f84756ac 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go +++ b/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go @@ -224,7 +224,7 @@ func (t *Cortex) initStoreQueryable(cfg *Config) (services.Service, error) { return nil, nil } - if cfg.Storage.Engine == storage.StorageEngineTSDB { + if cfg.Storage.Engine == storage.StorageEngineTSDB && !cfg.TSDB.StoreGatewayEnabled { storeQueryable, err := querier.NewBlockQueryable(cfg.TSDB, cfg.Server.LogLevel, prometheus.DefaultRegisterer) if err != nil { return nil, err @@ -233,13 +233,28 @@ func (t *Cortex) initStoreQueryable(cfg *Config) (services.Service, error) { return storeQueryable, nil } + if cfg.Storage.Engine == storage.StorageEngineTSDB && cfg.TSDB.StoreGatewayEnabled { + // When running in single binary, if the blocks sharding is disabled and no custom + // store-gateway address has been configured, we can set it to the running process. + if cfg.Target == All && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" { + cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort) + } + + storeQueryable, err := querier.NewBlocksStoreQueryableFromConfig(cfg.Querier, cfg.StoreGateway, cfg.TSDB, util.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, err + } + t.storeQueryable = storeQueryable + return storeQueryable, nil + } + return nil, fmt.Errorf("unknown storage engine '%s'", cfg.Storage.Engine) } func (t *Cortex) initIngester(cfg *Config) (serv services.Service, err error) { cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort + cfg.Ingester.LifecyclerConfig.ListenPort = cfg.Server.GRPCListenPort cfg.Ingester.TSDBEnabled = cfg.Storage.Engine == storage.StorageEngineTSDB cfg.Ingester.TSDBConfig = cfg.TSDB cfg.Ingester.ShardByAllLabels = cfg.Distributor.ShardByAllLabels @@ -278,7 +293,7 @@ func (t *Cortex) initStore(cfg *Config) (serv services.Service, err error) { return } - t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides) + t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides, prometheus.DefaultRegisterer) if err != nil { return } @@ -497,7 +512,7 @@ func (t *Cortex) initDataPurger(cfg *Config) (services.Service, error) { return nil, err } - t.dataPurger, err = purger.NewDataPurger(cfg.DataPurgerConfig, t.deletesStore, t.store, storageClient) + t.dataPurger, err = purger.NewDataPurger(cfg.DataPurgerConfig, t.deletesStore, t.store, storageClient, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/flush.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/flush.go index 46de627fd76d..7dee669454be 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/flush.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/flush.go @@ -339,8 +339,6 @@ func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fing return err } - sizePerUser := i.metrics.chunkSizePerUser.WithLabelValues(userID) - countPerUser := i.metrics.chunksPerUser.WithLabelValues(userID) // Record statistics only when actual put request did not return error. for _, chunkDesc := range chunkDescs { utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size() @@ -348,8 +346,6 @@ func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fing i.metrics.chunkUtilization.Observe(utilization) i.metrics.chunkLength.Observe(float64(length)) i.metrics.chunkSize.Observe(float64(size)) - sizePerUser.Add(float64(size)) - countPerUser.Inc() i.metrics.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds()) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester.go index 6fa1e0fa7fde..43eb996c3b41 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "net/http" + "os" "sync" "time" @@ -157,6 +158,12 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c } } + if cfg.WALConfig.WALEnabled || cfg.WALConfig.Recover { + if err := os.MkdirAll(cfg.WALConfig.Dir, os.ModePerm); err != nil { + return nil, err + } + } + i := &Ingester{ cfg: cfg, clientConfig: clientConfig, diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/metrics.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/metrics.go index f1b9548902b6..636fd9ac209b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/metrics.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/metrics.go @@ -45,8 +45,6 @@ type ingesterMetrics struct { chunkUtilization prometheus.Histogram chunkLength prometheus.Histogram chunkSize prometheus.Histogram - chunksPerUser *prometheus.CounterVec - chunkSizePerUser *prometheus.CounterVec chunkAge prometheus.Histogram memoryChunks prometheus.Gauge flushReasons *prometheus.CounterVec @@ -153,14 +151,6 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD Help: "Distribution of stored chunk sizes (when stored).", Buckets: prometheus.ExponentialBuckets(500, 2, 5), // biggest bucket is 500*2^(5-1) = 8000 }), - chunksPerUser: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_chunks_stored_total", - Help: "Total stored chunks per user.", - }, []string{"user"}), - chunkSizePerUser: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_chunk_stored_bytes_total", - Help: "Total bytes stored in chunks per user.", - }, []string{"user"}), chunkAge: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingester_chunk_age_seconds", Help: "Distribution of chunk ages (when stored).", diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/wal.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/wal.go index 398a42388ba6..89a722a90898 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/wal.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/wal.go @@ -35,6 +35,8 @@ type WALConfig struct { Recover bool `yaml:"recover_from_wal"` Dir string `yaml:"wal_dir"` CheckpointDuration time.Duration `yaml:"checkpoint_duration"` + // We always checkpoint during shutdown. This option exists for the tests. + checkpointDuringShutdown bool } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -44,6 +46,7 @@ func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.WALEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.") f.BoolVar(&cfg.CheckpointEnabled, "ingester.checkpoint-enabled", true, "Enable checkpointing of in-memory chunks. It should always be true when using normally. Set it to false iff you are doing some small tests as there is no mechanism to delete the old WAL yet if checkpoint is disabled.") f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.") + cfg.checkpointDuringShutdown = true } // WAL interface allows us to have a no-op WAL when the WAL is disabled. @@ -69,11 +72,13 @@ type walWrapper struct { checkpointMtx sync.Mutex // Checkpoint metrics. - checkpointDeleteFail prometheus.Counter - checkpointDeleteTotal prometheus.Counter - checkpointCreationFail prometheus.Counter - checkpointCreationTotal prometheus.Counter - checkpointDuration prometheus.Summary + checkpointDeleteFail prometheus.Counter + checkpointDeleteTotal prometheus.Counter + checkpointCreationFail prometheus.Counter + checkpointCreationTotal prometheus.Counter + checkpointDuration prometheus.Summary + checkpointLoggedBytesTotal prometheus.Counter + walLoggedBytesTotal prometheus.Counter } // newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL. @@ -121,6 +126,14 @@ func newWAL(cfg WALConfig, userStatesFunc func() map[string]*userState, register Help: "Time taken to create a checkpoint.", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }) + w.checkpointLoggedBytesTotal = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_checkpoint_logged_bytes_total", + Help: "Total number of bytes written to disk for checkpointing.", + }) + w.walLoggedBytesTotal = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_wal_logged_bytes_total", + Help: "Total number of bytes written to disk for WAL records.", + }) w.wait.Add(1) go w.run() @@ -145,6 +158,7 @@ func (w *walWrapper) Log(record *Record) error { if err != nil { return err } + w.walLoggedBytesTotal.Add(float64(len(buf))) return w.wal.Log(buf) } } @@ -172,9 +186,11 @@ func (w *walWrapper) run() { level.Info(util.Logger).Log("msg", "checkpoint done", "time", elapsed.String()) w.checkpointDuration.Observe(elapsed.Seconds()) case <-w.quit: - level.Info(util.Logger).Log("msg", "creating checkpoint before shutdown") - if err := w.performCheckpoint(true); err != nil { - level.Error(util.Logger).Log("msg", "error checkpointing series during shutdown", "err", err) + if w.cfg.checkpointDuringShutdown { + level.Info(util.Logger).Log("msg", "creating checkpoint before shutdown") + if err := w.performCheckpoint(true); err != nil { + level.Error(util.Logger).Log("msg", "error checkpointing series during shutdown", "err", err) + } } return } @@ -396,7 +412,11 @@ func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Finge return wireChunks, err } - return wireChunks, cp.Log(buf) + err = cp.Log(buf) + if err == nil { + w.checkpointLoggedBytesTotal.Add(float64(len(buf))) + } + return wireChunks, err } type walRecoveryParameters struct { diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go new file mode 100644 index 000000000000..ff54044a1d55 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go @@ -0,0 +1,71 @@ +package querier + +import ( + "context" + "fmt" + "math/rand" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/extprom" + + "github.com/cortexproject/cortex/pkg/ring/client" + "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" + "github.com/cortexproject/cortex/pkg/util/services" +) + +// BlocksStoreSet implementation used when the blocks are not sharded in the store-gateway +// and so requests are balanced across the set of store-gateway instances. +type blocksStoreBalancedSet struct { + services.Service + + serviceAddresses []string + clientsPool *client.Pool + dnsProvider *dns.Provider +} + +func newBlocksStoreBalancedSet(serviceAddresses []string, logger log.Logger, reg prometheus.Registerer) *blocksStoreBalancedSet { + const dnsResolveInterval = 10 * time.Second + + dnsProviderReg := extprom.WrapRegistererWithPrefix("cortex_storegateway_client_", reg) + + s := &blocksStoreBalancedSet{ + serviceAddresses: serviceAddresses, + dnsProvider: dns.NewProvider(logger, dnsProviderReg, dns.GolangResolverType), + clientsPool: newStoreGatewayClientPool(nil, logger, reg), + } + + s.Service = services.NewTimerService(dnsResolveInterval, s.starting, s.resolve, nil) + return s +} + +func (s *blocksStoreBalancedSet) starting(ctx context.Context) error { + // Initial DNS resolution. + return s.resolve(ctx) +} + +func (s *blocksStoreBalancedSet) resolve(ctx context.Context) error { + s.dnsProvider.Resolve(ctx, s.serviceAddresses) + return nil +} + +func (s *blocksStoreBalancedSet) GetClientsFor(_ []*metadata.Meta) ([]storegatewaypb.StoreGatewayClient, error) { + addresses := s.dnsProvider.Addresses() + if len(addresses) == 0 { + return nil, fmt.Errorf("no address resolved for the store-gateway service addresses %s", strings.Join(s.serviceAddresses, ",")) + } + + // Pick a random address and return its client from the pool. + addr := addresses[rand.Intn(len(addresses))] + c, err := s.clientsPool.GetClientFor(addr) + if err != nil { + return nil, errors.Wrapf(err, "failed to get store-gateway client for %s", addr) + } + + return []storegatewaypb.StoreGatewayClient{c.(storegatewaypb.StoreGatewayClient)}, nil +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_queryable.go b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_queryable.go new file mode 100644 index 000000000000..68b2f3f87774 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_queryable.go @@ -0,0 +1,327 @@ +package querier + +import ( + "context" + "io" + "sync" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/weaveworks/common/user" + "golang.org/x/sync/errgroup" + grpc_metadata "google.golang.org/grpc/metadata" + + "github.com/cortexproject/cortex/pkg/querier/series" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storegateway" + "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/spanlogger" +) + +var ( + errNoStoreGatewayAddress = errors.New("no store-gateway address configured") +) + +// BlocksStoreSet is the interface used to get the clients to query series on a set of blocks. +type BlocksStoreSet interface { + services.Service + + // GetClientsFor returns the store gateway clients that should be used to + // query the set of blocks in input. + GetClientsFor(metas []*metadata.Meta) ([]storegatewaypb.StoreGatewayClient, error) +} + +// BlocksFinder is the interface used to find blocks for a given user and time range. +type BlocksFinder interface { + services.Service + + // GetBlocks returns known blocks for userID containing samples within the range minT + // and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending. + GetBlocks(userID string, minT, maxT int64) ([]*metadata.Meta, error) +} + +// BlocksStoreQueryable is a queryable which queries blocks storage via +// the store-gateway. +type BlocksStoreQueryable struct { + services.Service + + stores BlocksStoreSet + finder BlocksFinder + + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher + + // Metrics. + storesHit prometheus.Histogram +} + +func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, reg prometheus.Registerer) (*BlocksStoreQueryable, error) { + util.WarnExperimentalUse("Blocks storage engine") + + manager, err := services.NewManager(stores, finder) + if err != nil { + return nil, errors.Wrap(err, "register blocks storage queryable subservices") + } + + q := &BlocksStoreQueryable{ + stores: stores, + finder: finder, + subservices: manager, + subservicesWatcher: services.NewFailureWatcher(), + storesHit: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "querier_storegateway_instances_hit_per_query", + Help: "Number of store-gateway instances hit for a single query.", + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }), + } + + q.Service = services.NewBasicService(q.starting, q.running, q.stopping) + + return q, nil +} + +func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg cortex_tsdb.Config, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) { + var stores BlocksStoreSet + + bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), storageCfg, "querier", logger) + if err != nil { + return nil, errors.Wrap(err, "failed to create bucket client") + } + + scanner := NewBlocksScanner(BlocksScannerConfig{ + ScanInterval: storageCfg.BucketStore.SyncInterval, + TenantsConcurrency: storageCfg.BucketStore.TenantSyncConcurrency, + MetasConcurrency: storageCfg.BucketStore.BlockSyncConcurrency, + CacheDir: storageCfg.BucketStore.SyncDir, + ConsistencyDelay: storageCfg.BucketStore.ConsistencyDelay, + IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay, + }, bucketClient, logger, reg) + + if gatewayCfg.ShardingEnabled { + storesRingCfg := gatewayCfg.ShardingRing.ToRingConfig() + storesRingBackend, err := kv.NewClient(storesRingCfg.KVStore, ring.GetCodec()) + if err != nil { + return nil, errors.Wrap(err, "failed to create store-gateway ring backend") + } + + storesRing, err := ring.NewWithStoreClientAndStrategy(storesRingCfg, storegateway.RingNameForClient, storegateway.RingKey, storesRingBackend, &storegateway.BlocksReplicationStrategy{}) + if err != nil { + return nil, errors.Wrap(err, "failed to create store-gateway ring client") + } + + if reg != nil { + reg.MustRegister(storesRing) + } + + stores, err = newBlocksStoreReplicationSet(storesRing, logger, reg) + if err != nil { + return nil, errors.Wrap(err, "failed to create store set") + } + } else { + if len(querierCfg.GetStoreGatewayAddresses()) == 0 { + return nil, errNoStoreGatewayAddress + } + + stores = newBlocksStoreBalancedSet(querierCfg.GetStoreGatewayAddresses(), logger, reg) + } + + return NewBlocksStoreQueryable(stores, scanner, reg) +} + +func (q *BlocksStoreQueryable) starting(ctx context.Context) error { + q.subservicesWatcher.WatchManager(q.subservices) + + if err := services.StartManagerAndAwaitHealthy(ctx, q.subservices); err != nil { + return errors.Wrap(err, "unable to start blocks storage queryable subservices") + } + + return nil +} + +func (q *BlocksStoreQueryable) running(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case err := <-q.subservicesWatcher.Chan(): + return errors.Wrap(err, "block storage queryable subservice failed") + } + } +} + +func (q *BlocksStoreQueryable) stopping(_ error) error { + return services.StopManagerAndAwaitStopped(context.Background(), q.subservices) +} + +// Querier returns a new Querier on the storage. +func (q *BlocksStoreQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + if s := q.State(); s != services.Running { + return nil, promql.ErrStorage{Err: errors.Errorf("BlocksStoreQueryable is not running: %v", s)} + } + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, promql.ErrStorage{Err: err} + } + + return &blocksStoreQuerier{ + ctx: ctx, + minT: mint, + maxT: maxt, + userID: userID, + finder: q.finder, + stores: q.stores, + storesHit: q.storesHit, + }, nil +} + +type blocksStoreQuerier struct { + ctx context.Context + minT, maxT int64 + userID string + finder BlocksFinder + stores BlocksStoreSet + storesHit prometheus.Histogram +} + +func (q *blocksStoreQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(sp, matchers...) +} + +func (q *blocksStoreQuerier) SelectSorted(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + set, warnings, err := q.selectSorted(sp, matchers...) + + // We need to wrap the error in order to have Prometheus returning a 5xx error. + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + err = promql.ErrStorage{Err: err} + } + + return set, warnings, err +} + +func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { + // Cortex doesn't use this. It will ask ingesters for metadata. + return nil, nil, errors.New("not implemented") +} + +func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) { + // Cortex doesn't use this. It will ask ingesters for metadata. + return nil, nil, errors.New("not implemented") +} + +func (q *blocksStoreQuerier) Close() error { + return nil +} + +func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + log, _ := spanlogger.New(q.ctx, "blocksStoreQuerier.selectSorted") + defer log.Span.Finish() + + minT, maxT := q.minT, q.maxT + if sp != nil { + minT, maxT = sp.Start, sp.End + } + + // Find the list of blocks we need to query given the time range. + metas, err := q.finder.GetBlocks(q.userID, minT, maxT) + if err != nil { + return nil, nil, err + } + + if len(metas) == 0 { + if q.storesHit != nil { + q.storesHit.Observe(0) + } + + return series.NewEmptySeriesSet(), nil, nil + } + + // Find the set of store-gateway instances having the blocks. + clients, err := q.stores.GetClientsFor(metas) + if err != nil { + return nil, nil, err + } + + req := &storepb.SeriesRequest{ + MinTime: minT, + MaxTime: maxT, + Matchers: convertMatchersToLabelMatcher(matchers), + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + } + + var ( + reqCtx = grpc_metadata.AppendToOutgoingContext(q.ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) + g, gCtx = errgroup.WithContext(reqCtx) + mtx = sync.Mutex{} + seriesSets = []storage.SeriesSet(nil) + warnings = storage.Warnings(nil) + ) + + // Concurrently fetch series from all clients. + for _, c := range clients { + // Change variable scope since it will be used in a goroutine. + c := c + + g.Go(func() error { + stream, err := c.Series(gCtx, req) + if err != nil { + return errors.Wrapf(err, "failed to fetch series from %s", c) + } + + mySeries := []*storepb.Series(nil) + myWarnings := storage.Warnings(nil) + + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrapf(err, "failed to receive series from %s", c) + } + + // Response may either contain series or warning. If it's warning, we get nil here. + if s := resp.GetSeries(); s != nil { + mySeries = append(mySeries, s) + } + + // Collect and return warnings too. + if w := resp.GetWarning(); w != "" { + myWarnings = append(myWarnings, errors.New(w)) + } + } + + // Store the result. + mtx.Lock() + seriesSets = append(seriesSets, &blockQuerierSeriesSet{series: mySeries}) + warnings = append(warnings, myWarnings...) + mtx.Unlock() + + return nil + }) + } + + // Wait until all client requests complete. + if err := g.Wait(); err != nil { + return nil, nil, err + } + + if q.storesHit != nil { + q.storesHit.Observe(float64(len(clients))) + } + + return storage.NewMergeSeriesSet(seriesSets, nil), warnings, nil +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go new file mode 100644 index 000000000000..ecaa0a4350b2 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go @@ -0,0 +1,150 @@ +package querier + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/client" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" + "github.com/cortexproject/cortex/pkg/util/services" +) + +// BlocksStoreSet implementation used when the blocks are sharded and replicated across +// a set of store-gateway instances. +type blocksStoreReplicationSet struct { + services.Service + + storesRing *ring.Ring + clientsPool *client.Pool + + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher +} + +func newBlocksStoreReplicationSet(storesRing *ring.Ring, logger log.Logger, reg prometheus.Registerer) (*blocksStoreReplicationSet, error) { + s := &blocksStoreReplicationSet{ + storesRing: storesRing, + clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), logger, reg), + } + + var err error + s.subservices, err = services.NewManager(s.storesRing, s.clientsPool) + if err != nil { + return nil, err + } + + s.Service = services.NewBasicService(s.starting, s.running, s.stopping) + + return s, nil +} + +func (s *blocksStoreReplicationSet) starting(ctx context.Context) error { + s.subservicesWatcher.WatchManager(s.subservices) + + if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil { + return errors.Wrap(err, "unable to start blocks store set subservices") + } + + return nil +} + +func (s *blocksStoreReplicationSet) running(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case err := <-s.subservicesWatcher.Chan(): + return errors.Wrap(err, "blocks store set subservice failed") + } + } +} + +func (s *blocksStoreReplicationSet) stopping(_ error) error { + return services.StopManagerAndAwaitStopped(context.Background(), s.subservices) +} + +func (s *blocksStoreReplicationSet) GetClientsFor(metas []*metadata.Meta) ([]storegatewaypb.StoreGatewayClient, error) { + var sets []ring.ReplicationSet + + // Find the replication set of each block we need to query. + for _, m := range metas { + // Buffer internally used by the ring (give extra room for a JOINING + LEAVING instance). + // Do not reuse the same buffer across multiple Get() calls because we do retain the + // returned replication set. + buf := make([]ring.IngesterDesc, 0, s.storesRing.ReplicationFactor()+2) + + set, err := s.storesRing.Get(cortex_tsdb.HashBlockID(m.ULID), ring.BlocksRead, buf) + if err != nil { + return nil, errors.Wrapf(err, "failed to get store-gateway replication set owning the block %s", m.ULID.String()) + } + + sets = append(sets, set) + } + + var clients []storegatewaypb.StoreGatewayClient + + // Get the client for each store-gateway. + for _, addr := range findSmallestInstanceSet(sets) { + c, err := s.clientsPool.GetClientFor(addr) + if err != nil { + return nil, errors.Wrapf(err, "failed to get store-gateway client for %s", addr) + } + + clients = append(clients, c.(storegatewaypb.StoreGatewayClient)) + } + + return clients, nil +} + +// findSmallestInstanceSet returns the minimal set of store-gateway instances including all required blocks. +// Blocks may be replicated across store-gateway instances, but we want to query the lowest number of instances +// possible, so this function tries to find the smallest set of store-gateway instances containing all blocks +// we need to query. +func findSmallestInstanceSet(sets []ring.ReplicationSet) []string { + addr := findHighestInstanceOccurrences(sets) + if addr == "" { + return nil + } + + // Remove any replication set containing the selected instance address. + for i := 0; i < len(sets); { + if sets[i].Includes(addr) { + sets = append(sets[:i], sets[i+1:]...) + } else { + i++ + } + } + + return append([]string{addr}, findSmallestInstanceSet(sets)...) +} + +func findHighestInstanceOccurrences(sets []ring.ReplicationSet) string { + var highestAddr string + var highestCount int + + occurrences := map[string]int{} + + for _, set := range sets { + for _, i := range set.Ingesters { + if v, ok := occurrences[i.Addr]; ok { + occurrences[i.Addr] = v + 1 + } else { + occurrences[i.Addr] = 1 + } + + if occurrences[i.Addr] > highestCount { + highestAddr = i.Addr + highestCount = occurrences[i.Addr] + } + } + } + + return highestAddr +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go b/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go index bd0cec1545d1..3e2b3373a79e 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go @@ -4,6 +4,7 @@ import ( "context" "errors" "flag" + "strings" "time" "github.com/cortexproject/cortex/pkg/chunk/purger" @@ -48,6 +49,9 @@ type Config struct { // However, we need to use active query tracker, otherwise we cannot limit Max Concurrent queries in the PromQL // engine. ActiveQueryTrackerDir string `yaml:"active_query_tracker_dir"` + + // Blocks storage only. + StoreGatewayAddresses string `yaml:"store_gateway_addresses"` } var ( @@ -70,6 +74,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.") f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should only be queried from storage and not just ingesters. 0 means all queries are sent to store.") f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.") + f.StringVar(&cfg.StoreGatewayAddresses, "experimental.querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the experimental blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).") } // Validate the config @@ -85,6 +90,14 @@ func (cfg *Config) Validate() error { return nil } +func (cfg *Config) GetStoreGatewayAddresses() []string { + if cfg.StoreGatewayAddresses == "" { + return nil + } + + return strings.Split(cfg.StoreGatewayAddresses, ",") +} + func getChunksIteratorFunction(cfg Config) chunkIteratorFunc { if cfg.BatchIterators { return batch.NewChunkMergeIterator @@ -94,6 +107,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc { return mergeChunks } +// NewChunkStoreQueryable returns the storage.Queryable implementation against the chunks store. func NewChunkStoreQueryable(cfg Config, chunkStore chunkstore.ChunkStore) storage.Queryable { return newChunkStoreQueryable(chunkStore, getChunksIteratorFunction(cfg)) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go b/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go index dd3f936b0b52..98d0ef0648b6 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go @@ -1,6 +1,9 @@ package querier import ( + "time" + + "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -12,11 +15,11 @@ import ( "github.com/cortexproject/cortex/pkg/util/grpcclient" ) -func NewStoreGatewayClientFactory(cfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { +func newStoreGatewayClientFactory(cfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", Name: "storegateway_client_request_duration_seconds", - Help: "Time spent executing requests on store-gateway.", + Help: "Time spent executing requests to the store-gateway.", Buckets: prometheus.ExponentialBuckets(0.008, 4, 7), ConstLabels: prometheus.Labels{"client": "querier"}, }, []string{"operation", "status_code"}) @@ -50,3 +53,34 @@ type storeGatewayClient struct { func (c *storeGatewayClient) Close() error { return c.conn.Close() } + +func (c *storeGatewayClient) String() string { + return c.conn.Target() +} + +func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, logger log.Logger, reg prometheus.Registerer) *client.Pool { + // We prefer sane defaults instead of exposing further config options. + clientCfg := grpcclient.Config{ + MaxRecvMsgSize: 100 << 20, + MaxSendMsgSize: 16 << 20, + UseGzipCompression: false, + RateLimit: 0, + RateLimitBurst: 0, + BackoffOnRatelimits: false, + } + + poolCfg := client.PoolConfig{ + CheckInterval: time.Minute, + HealthCheckEnabled: true, + HealthCheckTimeout: 10 * time.Second, + } + + clientsCount := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "storegateway_clients", + Help: "The current number of store-gateway clients in the pool.", + ConstLabels: map[string]string{"client": "querier"}, + }) + + return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, reg), clientsCount, logger) +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler_ring.go b/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler_ring.go index fb45cb2c8b1d..566474da126a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler_ring.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler_ring.go @@ -76,7 +76,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { // Configure lifecycler lc.RingConfig = rc - lc.ListenPort = &cfg.ListenPort + lc.ListenPort = cfg.ListenPort lc.Addr = cfg.InstanceAddr lc.Port = cfg.InstancePort lc.ID = cfg.InstanceID diff --git a/vendor/github.com/cortexproject/cortex/pkg/ruler/rules/objectclient/rule_store.go b/vendor/github.com/cortexproject/cortex/pkg/ruler/rules/objectclient/rule_store.go index f03a3d5dbf73..b76074efdefb 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ruler/rules/objectclient/rule_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ruler/rules/objectclient/rule_store.go @@ -64,7 +64,7 @@ func (o *RuleStore) getRuleGroup(ctx context.Context, objectKey string) (*rules. // ListAllRuleGroups returns all the active rule groups func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.RuleGroupList, error) { - ruleGroupObjects, err := o.client.List(ctx, generateRuleObjectKey("", "", "")) + ruleGroupObjects, _, err := o.client.List(ctx, generateRuleObjectKey("", "", "")) if err != nil { return nil, err } @@ -93,7 +93,7 @@ func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.Rul // ListRuleGroups returns all the active rule groups for a user func (o *RuleStore) ListRuleGroups(ctx context.Context, userID, namespace string) (rules.RuleGroupList, error) { - ruleGroupObjects, err := o.client.List(ctx, generateRuleObjectKey(userID, namespace, "")) + ruleGroupObjects, _, err := o.client.List(ctx, generateRuleObjectKey(userID, namespace, "")) if err != nil { return nil, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go b/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go index 3bab9993b210..075249a97737 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go +++ b/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go @@ -58,6 +58,7 @@ type Config struct { HeadCompactionInterval time.Duration `yaml:"head_compaction_interval"` HeadCompactionConcurrency int `yaml:"head_compaction_concurrency"` StripeSize int `yaml:"stripe_size"` + StoreGatewayEnabled bool `yaml:"store_gateway_enabled"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -128,6 +129,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.") f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block") f.IntVar(&cfg.StripeSize, "experimental.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.") + f.BoolVar(&cfg.StoreGatewayEnabled, "experimental.tsdb.store-gateway-enabled", false, "True if the Cortex cluster is running the store-gateway service and the querier should query the bucket store via the store-gateway.") } // Validate the config. diff --git a/vendor/github.com/cortexproject/cortex/pkg/storegateway/gateway_ring.go b/vendor/github.com/cortexproject/cortex/pkg/storegateway/gateway_ring.go index 48e96d08ad53..7f34f0b203db 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/storegateway/gateway_ring.go +++ b/vendor/github.com/cortexproject/cortex/pkg/storegateway/gateway_ring.go @@ -61,18 +61,18 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { } // Ring flags - cfg.KVStore.RegisterFlagsWithPrefix("experimental.store-gateway.ring.", "collectors/", f) - f.DurationVar(&cfg.HeartbeatPeriod, "experimental.store-gateway.ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.") - f.DurationVar(&cfg.HeartbeatTimeout, "experimental.store-gateway.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring.") + cfg.KVStore.RegisterFlagsWithPrefix("experimental.store-gateway.sharding-ring.", "collectors/", f) + f.DurationVar(&cfg.HeartbeatPeriod, "experimental.store-gateway.sharding-ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.") + f.DurationVar(&cfg.HeartbeatTimeout, "experimental.store-gateway.sharding-ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring.") f.IntVar(&cfg.ReplicationFactor, "experimental.store-gateway.replication-factor", 3, "The replication factor to use when sharding blocks.") f.StringVar(&cfg.TokensFilePath, "experimental.store-gateway.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") // Instance flags cfg.InstanceInterfaceNames = []string{"eth0", "en0"} - f.Var((*flagext.Strings)(&cfg.InstanceInterfaceNames), "experimental.store-gateway.ring.instance-interface", "Name of network interface to read address from.") - f.StringVar(&cfg.InstanceAddr, "experimental.store-gateway.ring.instance-addr", "", "IP address to advertise in the ring.") - f.IntVar(&cfg.InstancePort, "experimental.store-gateway.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") - f.StringVar(&cfg.InstanceID, "experimental.store-gateway.ring.instance-id", hostname, "Instance ID to register in the ring.") + f.Var((*flagext.Strings)(&cfg.InstanceInterfaceNames), "experimental.store-gateway.sharding-ring.instance-interface", "Name of network interface to read address from.") + f.StringVar(&cfg.InstanceAddr, "experimental.store-gateway.sharding-ring.instance-addr", "", "IP address to advertise in the ring.") + f.IntVar(&cfg.InstancePort, "experimental.store-gateway.sharding-ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") + f.StringVar(&cfg.InstanceID, "experimental.store-gateway.sharding-ring.instance-id", hostname, "Instance ID to register in the ring.") // Defaults for internal settings. cfg.RingCheckPeriod = 5 * time.Second