From 287244d110554984406d32013fc2127ab8c10f36 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Mon, 1 Feb 2021 12:43:31 +0100 Subject: [PATCH] Update vendored Cortex to 0976147451ee (#3267) --- go.mod | 2 +- go.sum | 4 +- pkg/distributor/distributor_test.go | 2 +- pkg/ingester/client/client.go | 8 +- pkg/querier/querier_mock_test.go | 2 +- .../cortexproject/cortex/pkg/api/api.go | 24 +++++- .../pkg/chunk/gcp/bigtable_index_client.go | 16 +++- .../pkg/chunk/gcp/bigtable_object_client.go | 7 +- .../cortex/pkg/chunk/gcp/table_client.go | 7 +- .../cortex/pkg/frontend/v2/frontend.go | 8 +- .../cortex/pkg/ingester/client/client.go | 2 +- .../pkg/querier/blocks_store_balanced_set.go | 5 +- .../querier/blocks_store_replicated_set.go | 5 +- .../cortex/pkg/querier/querier.go | 5 +- .../pkg/querier/store_gateway_client.go | 27 ++++-- .../pkg/querier/worker/frontend_processor.go | 2 +- .../pkg/querier/worker/scheduler_processor.go | 4 +- .../cortex/pkg/querier/worker/worker.go | 2 +- .../cortexproject/cortex/pkg/ring/batch.go | 28 +++---- .../cortex/pkg/ring/kv/etcd/etcd.go | 28 +++---- .../cortex/pkg/ring/replication_strategy.go | 44 +++++----- .../cortexproject/cortex/pkg/ring/ring.go | 82 ++++++++++--------- .../cortexproject/cortex/pkg/ruler/ruler.go | 2 +- .../cortex/pkg/scheduler/scheduler.go | 2 +- .../cortex/pkg/util/grpcclient/grpcclient.go | 46 ++++------- .../cortex/pkg/util/push/push.go | 8 +- .../cortexproject/cortex/pkg/util/tls/tls.go | 22 ++--- vendor/modules.txt | 2 +- 28 files changed, 216 insertions(+), 180 deletions(-) diff --git a/go.mod b/go.mod index 3cc115315938..6bc5a0d5dff8 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/cespare/xxhash/v2 v2.1.1 github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf - github.com/cortexproject/cortex v1.6.1-0.20210128165026-dc1e6a800b51 + github.com/cortexproject/cortex v1.6.1-0.20210129172402-0976147451ee github.com/davecgh/go-spew v1.1.1 github.com/docker/docker v20.10.1+incompatible github.com/docker/go-metrics v0.0.0-20181218153428-b84716841b82 // indirect diff --git a/go.sum b/go.sum index d638c050e6bf..ca08b8e5bc94 100644 --- a/go.sum +++ b/go.sum @@ -288,8 +288,8 @@ github.com/cortexproject/cortex v1.2.1-0.20200805064754-d8edc95e2c91/go.mod h1:P github.com/cortexproject/cortex v1.3.1-0.20200923145333-8587ea61fe17/go.mod h1:dJ9gpW7dzQ7z09cKtNN9PfebumgyO4dtNdFQ6eQEed0= github.com/cortexproject/cortex v1.4.1-0.20201030080541-83ad6df2abea/go.mod h1:kXo5F3jlF7Ky3+I31jt/bXTzOlQjl2X/vGDpy0RY1gU= github.com/cortexproject/cortex v1.5.1-0.20201111110551-ba512881b076/go.mod h1:zFBGVsvRBfVp6ARXZ7pmiLaGlbjda5ZnA4Y6qSJyrQg= -github.com/cortexproject/cortex v1.6.1-0.20210128165026-dc1e6a800b51 h1:7gEEgx199PzIPPw6oICg6cWnQ9NwfkGHTpgIoXaflQU= -github.com/cortexproject/cortex v1.6.1-0.20210128165026-dc1e6a800b51/go.mod h1:uwptskTaCiJPGHaEsIthCBtnOA1nN+KpLDezYvbvU8o= +github.com/cortexproject/cortex v1.6.1-0.20210129172402-0976147451ee h1:Lj7kPgeuMHzoejxD4QQjYNMDqPNB5Uiqj0GvYaINnG0= +github.com/cortexproject/cortex v1.6.1-0.20210129172402-0976147451ee/go.mod h1:uwptskTaCiJPGHaEsIthCBtnOA1nN+KpLDezYvbvU8o= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index c8c15f630fca..05b09885bf8a 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -394,7 +394,7 @@ func (r mockRing) ReplicationFactor() int { return int(r.replicationFactor) } -func (r mockRing) IngesterCount() int { +func (r mockRing) InstancesCount() int { return len(r.ingesters) } diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 717876f29753..0f6259893ad5 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -52,7 +52,13 @@ func New(cfg Config, addr string) (HealthAndIngesterClient, error) { grpc.WithInsecure(), grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...), } - opts = append(opts, cfg.GRPCClientConfig.DialOption(instrumentation())...) + + dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation()) + if err != nil { + return nil, err + } + + opts = append(opts, dialOpts...) conn, err := grpc.Dial(addr, opts...) if err != nil { return nil, err diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 103079ee5722..4afe8369615e 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -330,7 +330,7 @@ func (r *readRingMock) ReplicationFactor() int { return 1 } -func (r *readRingMock) IngesterCount() int { +func (r *readRingMock) InstancesCount() int { return len(r.replicationSet.Ingesters) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/api/api.go b/vendor/github.com/cortexproject/cortex/pkg/api/api.go index 12c14d24bc76..3f7d85dd782d 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/api/api.go +++ b/vendor/github.com/cortexproject/cortex/pkg/api/api.go @@ -35,6 +35,9 @@ import ( "github.com/cortexproject/cortex/pkg/util/push" ) +// DistributorPushWrapper wraps around a push. It is similar to middleware.Interface. +type DistributorPushWrapper func(next push.Func) push.Func + type Config struct { ResponseCompression bool `yaml:"response_compression_enabled"` @@ -45,6 +48,10 @@ type Config struct { ServerPrefix string `yaml:"-"` LegacyHTTPPrefix string `yaml:"-"` HTTPAuthMiddleware middleware.Interface `yaml:"-"` + + // This allows downstream projects to wrap the distributor push function + // and access the deserialized write requests before/after they are pushed. + DistributorPushWrapper DistributorPushWrapper `yaml:"-"` } // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -59,6 +66,15 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.PrometheusHTTPPrefix, prefix+"http.prometheus-http-prefix", "/prometheus", "HTTP URL path under which the Prometheus api will be served.") } +// Push either wraps the distributor push function as configured or returns the distributor push directly. +func (cfg *Config) wrapDistributorPush(d *distributor.Distributor) push.Func { + if cfg.DistributorPushWrapper != nil { + return cfg.DistributorPushWrapper(d.Push) + } + + return d.Push +} + type API struct { AuthMiddleware middleware.Interface @@ -189,7 +205,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) { func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) { client.RegisterPushOnlyIngesterServer(a.server.GRPC, d) - a.RegisterRoute("/api/v1/push", push.Handler(pushConfig, a.sourceIPs, d.Push), true, "POST") + a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ha_tracker", "HA Tracking Status") @@ -198,7 +214,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET") // Legacy Routes - a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/push", push.Handler(pushConfig, a.sourceIPs, d.Push), true, "POST") + a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET") } @@ -220,12 +236,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) { a.indexPage.AddLink(SectionDangerous, "/ingester/shutdown", "Trigger Ingester Shutdown (Dangerous)") a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST") a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST") - a.RegisterRoute("/ingester/push", push.Handler(pushConfig, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. // Legacy Routes a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST") a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST") - a.RegisterRoute("/push", push.Handler(pushConfig, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. + a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging. } // RegisterChunksPurger registers the endpoints associated with the Purger/DeleteStore. They do not exactly diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go index 7c6f0df5bf05..8992f358d5ed 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go @@ -51,6 +51,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.TableCacheEnabled, "bigtable.table-cache.enabled", true, "If enabled, once a tables info is fetched, it is cached.") f.DurationVar(&cfg.TableCacheExpiration, "bigtable.table-cache.expiration", 30*time.Minute, "Duration to cache tables before checking again.") + // This overrides our default from TLS disabled to TLS enabled + cfg.GRPCClientConfig.TLSEnabled = true cfg.GRPCClientConfig.RegisterFlagsWithPrefix("bigtable", f) } @@ -73,8 +75,11 @@ type storageClientV1 struct { // NewStorageClientV1 returns a new v1 StorageClient. func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) - client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) + dialOpts, err := cfg.GRPCClientConfig.DialOption(bigtableInstrumentation()) + if err != nil { + return nil, err + } + client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, toOptions(dialOpts)...) if err != nil { return nil, err } @@ -97,8 +102,11 @@ func newStorageClientV1(cfg Config, schemaCfg chunk.SchemaConfig, client *bigtab // NewStorageClientColumnKey returns a new v2 StorageClient. func NewStorageClientColumnKey(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) - client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) + dialOpts, err := cfg.GRPCClientConfig.DialOption(bigtableInstrumentation()) + if err != nil { + return nil, err + } + client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, toOptions(dialOpts)...) if err != nil { return nil, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go index ee722e1fa75a..2a18195a4b9f 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go @@ -22,8 +22,11 @@ type bigtableObjectClient struct { // NewBigtableObjectClient makes a new chunk.Client that stores chunks in // Bigtable. func NewBigtableObjectClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.Client, error) { - opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) - client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) + dialOpts, err := cfg.GRPCClientConfig.DialOption(bigtableInstrumentation()) + if err != nil { + return nil, err + } + client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, toOptions(dialOpts)...) if err != nil { return nil, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go index 268e087c579a..26d032b48312 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go @@ -24,8 +24,11 @@ type tableClient struct { // NewTableClient returns a new TableClient. func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) { - opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) - client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, opts...) + dialOpts, err := cfg.GRPCClientConfig.DialOption(bigtableInstrumentation()) + if err != nil { + return nil, err + } + client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, toOptions(dialOpts)...) if err != nil { return nil, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/frontend/v2/frontend.go b/vendor/github.com/cortexproject/cortex/pkg/frontend/v2/frontend.go index c369c59d82c9..268eab742675 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/frontend/v2/frontend.go +++ b/vendor/github.com/cortexproject/cortex/pkg/frontend/v2/frontend.go @@ -29,10 +29,10 @@ import ( // Config for a Frontend. type Config struct { - SchedulerAddress string `yaml:"scheduler_address"` - DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"` - WorkerConcurrency int `yaml:"scheduler_worker_concurrency"` - GRPCClientConfig grpcclient.ConfigWithTLS `yaml:"grpc_client_config"` + SchedulerAddress string `yaml:"scheduler_address"` + DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"` + WorkerConcurrency int `yaml:"scheduler_worker_concurrency"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` // Used to find local IP address, that is sent to scheduler and querier-worker. InfNames []string `yaml:"instance_interface_names"` diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go index 7ae169785fc8..6079d3b71615 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go @@ -55,7 +55,7 @@ func (c *closableHealthAndIngesterClient) Close() error { // Config is the configuration struct for the ingester client type Config struct { - GRPCClientConfig grpcclient.ConfigWithTLS `yaml:"grpc_client_config"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` } // RegisterFlags registers configuration settings used by the ingester client config. diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go index 4221c3a70442..0e408dcbf10b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go @@ -18,7 +18,6 @@ import ( "github.com/cortexproject/cortex/pkg/ring/client" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" - "github.com/cortexproject/cortex/pkg/util/tls" ) // BlocksStoreSet implementation used when the blocks are not sharded in the store-gateway @@ -31,7 +30,7 @@ type blocksStoreBalancedSet struct { dnsProvider *dns.Provider } -func newBlocksStoreBalancedSet(serviceAddresses []string, tlsCfg tls.ClientConfig, logger log.Logger, reg prometheus.Registerer) *blocksStoreBalancedSet { +func newBlocksStoreBalancedSet(serviceAddresses []string, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer) *blocksStoreBalancedSet { const dnsResolveInterval = 10 * time.Second dnsProviderReg := extprom.WrapRegistererWithPrefix("cortex_storegateway_client_", reg) @@ -39,7 +38,7 @@ func newBlocksStoreBalancedSet(serviceAddresses []string, tlsCfg tls.ClientConfi s := &blocksStoreBalancedSet{ serviceAddresses: serviceAddresses, dnsProvider: dns.NewProvider(logger, dnsProviderReg, dns.GolangResolverType), - clientsPool: newStoreGatewayClientPool(nil, tlsCfg, logger, reg), + clientsPool: newStoreGatewayClientPool(nil, clientConfig, logger, reg), } s.Service = services.NewTimerService(dnsResolveInterval, s.starting, s.resolve, nil) diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go index 7f0234a9bb12..bf6f2f847ce6 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go @@ -15,7 +15,6 @@ import ( "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" - "github.com/cortexproject/cortex/pkg/util/tls" ) // BlocksStoreSet implementation used when the blocks are sharded and replicated across @@ -37,13 +36,13 @@ func newBlocksStoreReplicationSet( storesRing *ring.Ring, shardingStrategy string, limits BlocksStoreLimits, - tlsCfg tls.ClientConfig, + clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer, ) (*blocksStoreReplicationSet, error) { s := &blocksStoreReplicationSet{ storesRing: storesRing, - clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), tlsCfg, logger, reg), + clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg), shardingStrategy: shardingStrategy, limits: limits, } diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go b/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go index cce1f521186d..ce8d12ee21e3 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go @@ -29,7 +29,6 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/spanlogger" - "github.com/cortexproject/cortex/pkg/util/tls" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -63,8 +62,8 @@ type Config struct { LookbackDelta time.Duration `yaml:"lookback_delta"` // Blocks storage only. - StoreGatewayAddresses string `yaml:"store_gateway_addresses"` - StoreGatewayClient tls.ClientConfig `yaml:"store_gateway_client"` + StoreGatewayAddresses string `yaml:"store_gateway_addresses"` + StoreGatewayClient ClientConfig `yaml:"store_gateway_client"` SecondStoreEngine string `yaml:"second_store_engine"` UseSecondStoreBeforeTime flagext.Time `yaml:"use_second_store_before_time"` diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go b/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go index 1571117d9c23..e956a83ec598 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go @@ -1,6 +1,7 @@ package querier import ( + "flag" "time" "github.com/go-kit/kit/log" @@ -16,7 +17,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/tls" ) -func newStoreGatewayClientFactory(clientCfg grpcclient.Config, tlsCfg tls.ClientConfig, reg prometheus.Registerer) client.PoolFactory { +func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", Name: "storegateway_client_request_duration_seconds", @@ -26,16 +27,16 @@ func newStoreGatewayClientFactory(clientCfg grpcclient.Config, tlsCfg tls.Client }, []string{"operation", "status_code"}) return func(addr string) (client.PoolClient, error) { - return dialStoreGatewayClient(clientCfg, tlsCfg, addr, requestDuration) + return dialStoreGatewayClient(clientCfg, addr, requestDuration) } } -func dialStoreGatewayClient(clientCfg grpcclient.Config, tlsCfg tls.ClientConfig, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) { - opts, err := tlsCfg.GetGRPCDialOptions() +func dialStoreGatewayClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) { + opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration)) if err != nil { return nil, err } - opts = append(opts, clientCfg.DialOption(grpcclient.Instrument(requestDuration))...) + conn, err := grpc.Dial(addr, opts...) if err != nil { return nil, errors.Wrapf(err, "failed to dial store-gateway %s", addr) @@ -66,7 +67,7 @@ func (c *storeGatewayClient) RemoteAddress() string { return c.conn.Target() } -func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, tlsCfg tls.ClientConfig, logger log.Logger, reg prometheus.Registerer) *client.Pool { +func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer) *client.Pool { // We prefer sane defaults instead of exposing further config options. clientCfg := grpcclient.Config{ MaxRecvMsgSize: 100 << 20, @@ -75,6 +76,8 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, tlsCfg tls RateLimit: 0, RateLimitBurst: 0, BackoffOnRatelimits: false, + TLSEnabled: clientConfig.TLSEnabled, + TLS: clientConfig.TLS, } poolCfg := client.PoolConfig{ CheckInterval: time.Minute, @@ -89,5 +92,15 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, tlsCfg tls ConstLabels: map[string]string{"client": "querier"}, }) - return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, tlsCfg, reg), clientsCount, logger) + return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, reg), clientsCount, logger) +} + +type ClientConfig struct { + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` +} + +func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS for gRPC client connecting to store-gateway.") + cfg.TLS.RegisterFlagsWithPrefix(prefix, f) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/worker/frontend_processor.go b/vendor/github.com/cortexproject/cortex/pkg/querier/worker/frontend_processor.go index 820f4ef1bc4a..cda955d6a6cf 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/worker/frontend_processor.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/worker/frontend_processor.go @@ -28,7 +28,7 @@ func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger) pr return &frontendProcessor{ log: log, handler: handler, - maxMessageSize: cfg.GRPCClientConfig.GRPC.MaxSendMsgSize, + maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, querierID: cfg.QuerierID, } } diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/worker/scheduler_processor.go b/vendor/github.com/cortexproject/cortex/pkg/querier/worker/scheduler_processor.go index 73a7e519e559..02ff2387864f 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/worker/scheduler_processor.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/worker/scheduler_processor.go @@ -35,7 +35,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r p := &schedulerProcessor{ log: log, handler: handler, - maxMessageSize: cfg.GRPCClientConfig.GRPC.MaxSendMsgSize, + maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, querierID: cfg.QuerierID, grpcConfig: cfg.GRPCClientConfig, @@ -65,7 +65,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r type schedulerProcessor struct { log log.Logger handler RequestHandler - grpcConfig grpcclient.ConfigWithTLS + grpcConfig grpcclient.Config maxMessageSize int querierID string diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/worker/worker.go b/vendor/github.com/cortexproject/cortex/pkg/querier/worker/worker.go index 289aaca11e19..a18ec7564ed4 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/worker/worker.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/worker/worker.go @@ -30,7 +30,7 @@ type Config struct { QuerierID string `yaml:"id"` - GRPCClientConfig grpcclient.ConfigWithTLS `yaml:"grpc_client_config"` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/batch.go b/vendor/github.com/cortexproject/cortex/pkg/ring/batch.go index 69672269fe9e..3990bdd4dd30 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/batch.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/batch.go @@ -15,7 +15,7 @@ type batchTracker struct { err chan error } -type ingester struct { +type instance struct { desc IngesterDesc itemTrackers []*itemTracker indexes []int @@ -30,21 +30,21 @@ type itemTracker struct { // DoBatch request against a set of keys in the ring, handling replication and // failures. For example if we want to write N items where they may all -// hit different ingesters, and we want them all replicated R ways with +// hit different instances, and we want them all replicated R ways with // quorum writes, we track the relationship between batch RPCs and the items // within them. // -// Callback is passed the ingester to target, and the indexes of the keys -// to send to that ingester. +// Callback is passed the instance to target, and the indexes of the keys +// to send to that instance. // // Not implemented as a method on Ring so we can test separately. func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error { - if r.IngesterCount() <= 0 { - return fmt.Errorf("DoBatch: IngesterCount <= 0") + if r.InstancesCount() <= 0 { + return fmt.Errorf("DoBatch: InstancesCount <= 0") } - expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.IngesterCount() + expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.InstancesCount() itemTrackers := make([]itemTracker, len(keys)) - ingesters := make(map[string]ingester, r.IngesterCount()) + instances := make(map[string]instance, r.InstancesCount()) var ( bufDescs [GetBufferSize]IngesterDesc @@ -60,12 +60,12 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb itemTrackers[i].maxFailures = replicationSet.MaxErrors for _, desc := range replicationSet.Ingesters { - curr, found := ingesters[desc.Addr] + curr, found := instances[desc.Addr] if !found { curr.itemTrackers = make([]*itemTracker, 0, expectedTrackers) curr.indexes = make([]int, 0, expectedTrackers) } - ingesters[desc.Addr] = ingester{ + instances[desc.Addr] = instance{ desc: desc, itemTrackers: append(curr.itemTrackers, &itemTrackers[i]), indexes: append(curr.indexes, i), @@ -81,9 +81,9 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb var wg sync.WaitGroup - wg.Add(len(ingesters)) - for _, i := range ingesters { - go func(i ingester) { + wg.Add(len(instances)) + for _, i := range instances { + go func(i instance) { err := callback(i.desc, i.indexes) tracker.record(i.itemTrackers, err) wg.Done() @@ -111,7 +111,7 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { // If we succeed, decrement each sample's pending count by one. If we reach // the required number of successful puts on this sample, then decrement the // number of pending samples by one. If we successfully push all samples to - // min success ingesters, wake up the waiting rpc so it can return early. + // min success instances, wake up the waiting rpc so it can return early. // Similarly, track the number of errors, and if it exceeds maxFailures // shortcut the waiting rpc. // diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/etcd/etcd.go b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/etcd/etcd.go index f63ccf819885..fa1e61732689 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/etcd/etcd.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/etcd/etcd.go @@ -15,18 +15,16 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" + cortex_tls "github.com/cortexproject/cortex/pkg/util/tls" ) // Config for a new etcd.Client. type Config struct { - Endpoints []string `yaml:"endpoints"` - DialTimeout time.Duration `yaml:"dial_timeout"` - MaxRetries int `yaml:"max_retries"` - EnableTLS bool `yaml:"tls_enabled"` - CertFile string `yaml:"tls_cert_path"` - KeyFile string `yaml:"tls_key_path"` - TrustedCAFile string `yaml:"tls_ca_path"` - InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify"` + Endpoints []string `yaml:"endpoints"` + DialTimeout time.Duration `yaml:"dial_timeout"` + MaxRetries int `yaml:"max_retries"` + EnableTLS bool `yaml:"tls_enabled"` + TLS cortex_tls.ClientConfig `yaml:",inline"` } // Client implements ring.KVClient for etcd. @@ -43,10 +41,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.DialTimeout, prefix+"etcd.dial-timeout", 10*time.Second, "The dial timeout for the etcd connection.") f.IntVar(&cfg.MaxRetries, prefix+"etcd.max-retries", 10, "The maximum number of retries to do for failed ops.") f.BoolVar(&cfg.EnableTLS, prefix+"etcd.tls-enabled", false, "Enable TLS.") - f.StringVar(&cfg.CertFile, prefix+"etcd.tls-cert-path", "", "The TLS certificate file path.") - f.StringVar(&cfg.KeyFile, prefix+"etcd.tls-key-path", "", "The TLS private key file path.") - f.StringVar(&cfg.TrustedCAFile, prefix+"etcd.tls-ca-path", "", "The trusted CA file path.") - f.BoolVar(&cfg.InsecureSkipVerify, prefix+"etcd.tls-insecure-skip-verify", false, "Skip validating server certificate.") + cfg.TLS.RegisterFlagsWithPrefix(prefix+"etcd", f) } // GetTLS sets the TLS config field with certs @@ -55,10 +50,11 @@ func (cfg *Config) GetTLS() (*tls.Config, error) { return nil, nil } tlsInfo := &transport.TLSInfo{ - CertFile: cfg.CertFile, - KeyFile: cfg.KeyFile, - TrustedCAFile: cfg.TrustedCAFile, - InsecureSkipVerify: cfg.InsecureSkipVerify, + CertFile: cfg.TLS.CertPath, + KeyFile: cfg.TLS.KeyPath, + TrustedCAFile: cfg.TLS.CAPath, + ServerName: cfg.TLS.ServerName, + InsecureSkipVerify: cfg.TLS.InsecureSkipVerify, } return tlsInfo.ClientConfig() } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/replication_strategy.go b/vendor/github.com/cortexproject/cortex/pkg/ring/replication_strategy.go index 5156ba22d126..f28a54e61827 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/replication_strategy.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/replication_strategy.go @@ -20,49 +20,49 @@ func NewDefaultReplicationStrategy() ReplicationStrategy { return &defaultReplicationStrategy{} } -// Filter decides, given the set of ingesters eligible for a key, -// which ingesters you will try and write to and how many failures you will +// Filter decides, given the set of instances eligible for a key, +// which instances you will try and write to and how many failures you will // tolerate. -// - Filters out dead ingesters so the one doesn't even try to write to them. -// - Checks there is enough ingesters for an operation to succeed. -// The ingesters argument may be overwritten. -func (s *defaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) ([]IngesterDesc, int, error) { - // We need a response from a quorum of ingesters, which is n/2 + 1. In the +// - Filters out unhealthy instances so the one doesn't even try to write to them. +// - Checks there are enough instances for an operation to succeed. +// The instances argument may be overwritten. +func (s *defaultReplicationStrategy) Filter(instances []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) ([]IngesterDesc, int, error) { + // We need a response from a quorum of instances, which is n/2 + 1. In the // case of a node joining/leaving, the actual replica set might be bigger // than the replication factor, so use the bigger or the two. - if len(ingesters) > replicationFactor { - replicationFactor = len(ingesters) + if len(instances) > replicationFactor { + replicationFactor = len(instances) } minSuccess := (replicationFactor / 2) + 1 now := time.Now() // Skip those that have not heartbeated in a while. NB these are still - // included in the calculation of minSuccess, so if too many failed ingesters + // included in the calculation of minSuccess, so if too many failed instances // will cause the whole write to fail. - for i := 0; i < len(ingesters); { - if ingesters[i].IsHealthy(op, heartbeatTimeout, now) { + for i := 0; i < len(instances); { + if instances[i].IsHealthy(op, heartbeatTimeout, now) { i++ } else { - ingesters = append(ingesters[:i], ingesters[i+1:]...) + instances = append(instances[:i], instances[i+1:]...) } } - // This is just a shortcut - if there are not minSuccess available ingesters, + // This is just a shortcut - if there are not minSuccess available instances, // after filtering out dead ones, don't even bother trying. - if len(ingesters) < minSuccess { + if len(instances) < minSuccess { var err error if zoneAwarenessEnabled { - err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d", minSuccess, len(ingesters)) + err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d", minSuccess, len(instances)) } else { - err = fmt.Errorf("at least %d live replicas required, could only find %d", minSuccess, len(ingesters)) + err = fmt.Errorf("at least %d live replicas required, could only find %d", minSuccess, len(instances)) } return nil, 0, err } - return ingesters, len(ingesters) - minSuccess, nil + return instances, len(instances) - minSuccess, nil } type ignoreUnhealthyInstancesReplicationStrategy struct{} @@ -90,8 +90,8 @@ func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []Ingeste return instances, len(instances) - 1, nil } -func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation, now time.Time) bool { - return ingester.IsHealthy(op, r.cfg.HeartbeatTimeout, now) +func (r *Ring) IsHealthy(instance *IngesterDesc, op Operation, now time.Time) bool { + return instance.IsHealthy(op, r.cfg.HeartbeatTimeout, now) } // ReplicationFactor of the ring. @@ -99,8 +99,8 @@ func (r *Ring) ReplicationFactor() int { return r.cfg.ReplicationFactor } -// IngesterCount is number of ingesters in the ring -func (r *Ring) IngesterCount() int { +// InstancesCount returns the number of instances in the ring. +func (r *Ring) InstancesCount() int { r.mtx.RLock() c := len(r.ringDesc.Ingesters) r.mtx.RUnlock() diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go b/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go index ecb6501327de..ad24dc31a02e 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go @@ -46,14 +46,14 @@ const ( type ReadRing interface { prometheus.Collector - // Get returns n (or more) ingesters which form the replicas for the given key. + // Get returns n (or more) instances which form the replicas for the given key. // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value // to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet(). Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) // GetAllHealthy returns all healthy instances in the ring, for the given operation. // This function doesn't check if the quorum is honored, so doesn't fail if the number - // of unhealthy ingesters is greater than the tolerated max unavailable. + // of unhealthy instances is greater than the tolerated max unavailable. GetAllHealthy(op Operation) (ReplicationSet, error) // GetReplicationSetForOperation returns all instances where the input operation should be executed. @@ -63,7 +63,9 @@ type ReadRing interface { GetReplicationSetForOperation(op Operation) (ReplicationSet, error) ReplicationFactor() int - IngesterCount() int + + // InstancesCount returns the number of instances in the ring. + InstancesCount() int // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) // and size (number of instances). @@ -78,12 +80,12 @@ type ReadRing interface { } var ( - // Write operation that also extends replica set, if ingester state is not ACTIVE. + // Write operation that also extends replica set, if instance state is not ACTIVE. Write = NewOp([]IngesterState{ACTIVE}, func(s IngesterState) bool { - // We do not want to Write to Ingesters that are not ACTIVE, but we do want + // We do not want to Write to instances that are not ACTIVE, but we do want // to write the extra replica somewhere. So we increase the size of the set // of replicas for the key. - // NB dead ingester will be filtered later by defaultReplicationStrategy.Filter(). + // NB unhealthy instances will be filtered later by defaultReplicationStrategy.Filter(). return s != ACTIVE }) @@ -108,9 +110,9 @@ var ( // not registered within the ring. ErrInstanceNotFound = errors.New("instance not found in the ring") - // ErrTooManyFailedIngesters is the error returned when there are too many failed ingesters for a + // ErrTooManyUnhealthyInstances is the error returned when there are too many failed instances for a // specific operation. - ErrTooManyFailedIngesters = errors.New("too many failed ingesters") + ErrTooManyUnhealthyInstances = errors.New("too many unhealthy instances in the ring") // ErrInconsistentTokensInfo is the error returned if, due to an internal bug, the mapping between // a token and its own instance is missing or unknown. @@ -301,7 +303,7 @@ func (r *Ring) loop(ctx context.Context) error { return nil } -// Get returns n (or more) ingesters which form the replicas for the given key. +// Get returns n (or more) instances which form the replicas for the given key. func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() @@ -311,7 +313,7 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, var ( n = r.cfg.ReplicationFactor - ingesters = bufDescs[:0] + instances = bufDescs[:0] start = searchToken(r.ringTokens, key) iterations = 0 @@ -332,12 +334,12 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, return ReplicationSet{}, ErrInconsistentTokensInfo } - // We want n *distinct* ingesters && distinct zones. + // We want n *distinct* instances && distinct zones. if util.StringsContain(distinctHosts, info.InstanceID) { continue } - // Ignore if the ingesters don't have a zone set. + // Ignore if the instances don't have a zone set. if r.cfg.ZoneAwarenessEnabled && info.Zone != "" { if util.StringsContain(distinctZones, info.Zone) { continue @@ -346,24 +348,24 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, } distinctHosts = append(distinctHosts, info.InstanceID) - ingester := r.ringDesc.Ingesters[info.InstanceID] + instance := r.ringDesc.Ingesters[info.InstanceID] // Check whether the replica set should be extended given we're including // this instance. - if op.ShouldExtendReplicaSetOnState(ingester.State) { + if op.ShouldExtendReplicaSetOnState(instance.State) { n++ } - ingesters = append(ingesters, ingester) + instances = append(instances, instance) } - liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) + healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) if err != nil { return ReplicationSet{}, err } return ReplicationSet{ - Ingesters: liveIngesters, + Ingesters: healthyInstances, MaxErrors: maxFailure, }, nil } @@ -378,15 +380,15 @@ func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) { } now := time.Now() - ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) - for _, ingester := range r.ringDesc.Ingesters { - if r.IsHealthy(&ingester, op, now) { - ingesters = append(ingesters, ingester) + instances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) + for _, instance := range r.ringDesc.Ingesters { + if r.IsHealthy(&instance, op, now) { + instances = append(instances, instance) } } return ReplicationSet{ - Ingesters: ingesters, + Ingesters: instances, MaxErrors: 0, }, nil } @@ -405,11 +407,11 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro zoneFailures := make(map[string]struct{}) now := time.Now() - for _, ingester := range r.ringDesc.Ingesters { - if r.IsHealthy(&ingester, op, now) { - healthyInstances = append(healthyInstances, ingester) + for _, instance := range r.ringDesc.Ingesters { + if r.IsHealthy(&instance, op, now) { + healthyInstances = append(healthyInstances, instance) } else { - zoneFailures[ingester.Zone] = struct{}{} + zoneFailures[instance.Zone] = struct{}{} } } @@ -427,19 +429,19 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro maxUnavailableZones = minSuccessZones - 1 if len(zoneFailures) > maxUnavailableZones { - return ReplicationSet{}, ErrTooManyFailedIngesters + return ReplicationSet{}, ErrTooManyUnhealthyInstances } if len(zoneFailures) > 0 { // We remove all instances (even healthy ones) from zones with at least - // 1 failing ingester. Due to how replication works when zone-awareness is + // 1 failing instance. Due to how replication works when zone-awareness is // enabled (data is replicated to RF different zones), there's no benefit in // querying healthy instances from "failing zones". A zone is considered // failed if there is single error. filteredInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) - for _, ingester := range healthyInstances { - if _, ok := zoneFailures[ingester.Zone]; !ok { - filteredInstances = append(filteredInstances, ingester) + for _, instance := range healthyInstances { + if _, ok := zoneFailures[instance.Zone]; !ok { + filteredInstances = append(filteredInstances, instance) } } @@ -450,7 +452,7 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro // instance, we have to decrease the max unavailable zones accordingly. maxUnavailableZones -= len(zoneFailures) } else { - // Calculate the number of required ingesters; + // Calculate the number of required instances; // ensure we always require at least RF-1 when RF=3. numRequired := len(r.ringDesc.Ingesters) if numRequired < r.cfg.ReplicationFactor { @@ -460,7 +462,7 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro numRequired -= r.cfg.ReplicationFactor / 2 if len(healthyInstances) < numRequired { - return ReplicationSet{}, ErrTooManyFailedIngesters + return ReplicationSet{}, ErrTooManyUnhealthyInstances } maxErrors = len(healthyInstances) - numRequired @@ -543,14 +545,14 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { oldestTimestampByState[s] = 0 } - for _, ingester := range r.ringDesc.Ingesters { - s := ingester.State.String() - if !r.IsHealthy(&ingester, Reporting, time.Now()) { + for _, instance := range r.ringDesc.Ingesters { + s := instance.State.String() + if !r.IsHealthy(&instance, Reporting, time.Now()) { s = unhealthy } numByState[s]++ - if oldestTimestampByState[s] == 0 || ingester.Timestamp < oldestTimestampByState[s] { - oldestTimestampByState[s] = ingester.Timestamp + if oldestTimestampByState[s] == 0 || instance.Timestamp < oldestTimestampByState[s] { + oldestTimestampByState[s] = instance.Timestamp } } @@ -599,7 +601,7 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { // set of instances, with a reduced number of overlapping instances between two identifiers. func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { // Nothing to do if the shard size is not smaller then the actual ring. - if size <= 0 || r.IngesterCount() <= size { + if size <= 0 || r.InstancesCount() <= size { return r } @@ -622,7 +624,7 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { // This function doesn't support caching. func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing { // Nothing to do if the shard size is not smaller then the actual ring. - if size <= 0 || r.IngesterCount() <= size { + if size <= 0 || r.InstancesCount() <= size { return r } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler.go b/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler.go index f4b93fb52114..d5ac2fe70d2b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler.go @@ -64,7 +64,7 @@ type Config struct { // This is used for template expansion in alerts; must be a valid URL. ExternalURL flagext.URLValue `yaml:"external_url"` // GRPC Client configuration. - ClientTLSConfig grpcclient.ConfigWithTLS `yaml:"ruler_client"` + ClientTLSConfig grpcclient.Config `yaml:"ruler_client"` // How frequently to evaluate rules by default. EvaluationInterval time.Duration `yaml:"evaluation_interval"` // How frequently to poll for updated rules. diff --git a/vendor/github.com/cortexproject/cortex/pkg/scheduler/scheduler.go b/vendor/github.com/cortexproject/cortex/pkg/scheduler/scheduler.go index 5a0603bc6c45..7b18f5f3a51a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/scheduler/scheduler.go +++ b/vendor/github.com/cortexproject/cortex/pkg/scheduler/scheduler.go @@ -74,7 +74,7 @@ type connectedFrontend struct { type Config struct { MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"` - GRPCClientConfig grpcclient.ConfigWithTLS `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."` + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/grpcclient.go b/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/grpcclient.go index c518bbc09f36..e876804c4b41 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/grpcclient.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/grpcclient.go @@ -26,6 +26,9 @@ type Config struct { BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"` BackoffConfig util.BackoffConfig `yaml:"backoff_config"` + + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` } // RegisterFlags registers flags. @@ -41,8 +44,11 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.") + f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS in the GRPC client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to gRPC server will be used.") cfg.BackoffConfig.RegisterFlags(prefix, f) + + cfg.TLS.RegisterFlagsWithPrefix(prefix, f) } func (cfg *Config) Validate(log log.Logger) error { @@ -67,7 +73,14 @@ func (cfg *Config) CallOptions() []grpc.CallOption { } // DialOption returns the config as a grpc.DialOptions. -func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) []grpc.DialOption { +func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { + var opts []grpc.DialOption + tlsOpts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled) + if err != nil { + return nil, err + } + opts = append(opts, tlsOpts...) + if cfg.BackoffOnRatelimits { unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewBackoffRetry(cfg.BackoffConfig)}, unaryClientInterceptors...) } @@ -76,7 +89,8 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewRateLimiter(cfg)}, unaryClientInterceptors...) } - return []grpc.DialOption{ + return append( + opts, grpc.WithDefaultCallOptions(cfg.CallOptions()...), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(unaryClientInterceptors...)), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(streamClientInterceptors...)), @@ -85,31 +99,5 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep Timeout: time.Second * 10, PermitWithoutStream: true, }), - } -} - -// ConfigWithTLS is the config for a grpc client with tls -type ConfigWithTLS struct { - GRPC Config `yaml:",inline"` - TLS tls.ClientConfig `yaml:",inline"` -} - -// RegisterFlagsWithPrefix registers flags with prefix. -func (cfg *ConfigWithTLS) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.GRPC.RegisterFlagsWithPrefix(prefix, f) - cfg.TLS.RegisterFlagsWithPrefix(prefix, f) -} - -func (cfg *ConfigWithTLS) Validate(log log.Logger) error { - return cfg.GRPC.Validate(log) -} - -// DialOption returns the config as a grpc.DialOptions -func (cfg *ConfigWithTLS) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { - opts, err := cfg.TLS.GetGRPCDialOptions() - if err != nil { - return nil, err - } - - return append(opts, cfg.GRPC.DialOption(unaryClientInterceptors, streamClientInterceptors)...), nil + ), nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/push/push.go b/vendor/github.com/cortexproject/cortex/pkg/util/push/push.go index 93ca2d70e7fa..ed7829b19e63 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/push/push.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/push/push.go @@ -8,14 +8,16 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" - "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/log" ) +// Func defines the type of the push. It is similar to http.HandlerFunc. +type Func func(context.Context, *client.WriteRequest) (*client.WriteResponse, error) + // Handler is a http.Handler which accepts WriteRequests. -func Handler(cfg distributor.Config, sourceIPs *middleware.SourceIPExtractor, push func(context.Context, *client.WriteRequest) (*client.WriteResponse, error)) http.Handler { +func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := log.WithContext(ctx, log.Logger) @@ -27,7 +29,7 @@ func Handler(cfg distributor.Config, sourceIPs *middleware.SourceIPExtractor, pu } } var req client.PreallocWriteRequest - err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, util.RawSnappy) + err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/tls/tls.go b/vendor/github.com/cortexproject/cortex/pkg/util/tls/tls.go index 28b36941dfca..9886b208ddce 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/tls/tls.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/tls/tls.go @@ -16,6 +16,7 @@ type ClientConfig struct { CertPath string `yaml:"tls_cert_path"` KeyPath string `yaml:"tls_key_path"` CAPath string `yaml:"tls_ca_path"` + ServerName string `yaml:"tls_server_name"` InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify"` } @@ -29,18 +30,15 @@ func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.StringVar(&cfg.CertPath, prefix+".tls-cert-path", "", "Path to the client certificate file, which will be used for authenticating with the server. Also requires the key path to be configured.") f.StringVar(&cfg.KeyPath, prefix+".tls-key-path", "", "Path to the key file for the client certificate. Also requires the client certificate to be configured.") f.StringVar(&cfg.CAPath, prefix+".tls-ca-path", "", "Path to the CA certificates file to validate server certificate against. If not set, the host's root CA certificates are used.") + f.StringVar(&cfg.ServerName, prefix+".tls-server-name", "", "Override the expected name on the server certificate.") f.BoolVar(&cfg.InsecureSkipVerify, prefix+".tls-insecure-skip-verify", false, "Skip validating server certificate.") } // GetTLSConfig initialises tls.Config from config options func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { - // no tls config given at all - if cfg.CertPath == "" && cfg.KeyPath == "" && cfg.CAPath == "" { - return nil, nil - } - config := &tls.Config{ InsecureSkipVerify: cfg.InsecureSkipVerify, + ServerName: cfg.ServerName, } // read ca certificates @@ -75,11 +73,15 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { } // GetGRPCDialOptions creates GRPC DialOptions for TLS -func (cfg *ClientConfig) GetGRPCDialOptions() ([]grpc.DialOption, error) { - if tlsConfig, err := cfg.GetTLSConfig(); err != nil { +func (cfg *ClientConfig) GetGRPCDialOptions(enabled bool) ([]grpc.DialOption, error) { + if !enabled { + return []grpc.DialOption{grpc.WithInsecure()}, nil + } + + tlsConfig, err := cfg.GetTLSConfig() + if err != nil { return nil, errors.Wrap(err, "error creating grpc dial options") - } else if tlsConfig != nil { - return []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}, nil } - return []grpc.DialOption{grpc.WithInsecure()}, nil + + return []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 9e081b4178a0..d2e3325ad824 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -164,7 +164,7 @@ github.com/coreos/go-systemd/journal github.com/coreos/go-systemd/sdjournal # github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f github.com/coreos/pkg/capnslog -# github.com/cortexproject/cortex v1.6.1-0.20210128165026-dc1e6a800b51 +# github.com/cortexproject/cortex v1.6.1-0.20210129172402-0976147451ee ## explicit github.com/cortexproject/cortex/pkg/alertmanager github.com/cortexproject/cortex/pkg/alertmanager/alerts