diff --git a/CHANGELOG.md b/CHANGELOG.md index ce185695c864..f2c1eb712f07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * [4543](https://github.com/grafana/loki/pull/4543) **trevorwhitney**: Change more default values and improve application of common storage config * [4570](https://github.com/grafana/loki/pull/4570) **DylanGuedes**: Loki: Append loopback to ingester net interface default list * [4594](https://github.com/grafana/loki/pull/4594) **owen-d**: Configures unordered_writes=true by default +* [4574](https://github.com/grafana/loki/pull/4574) **slim-bean**: Loki: Add a ring to the compactor used to control concurrency when not running standalone # 2.3.0 (2021/08/06) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 6c142d77a2f1..b4a9d219cfd2 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -1903,33 +1903,126 @@ compacts index shards to more performant forms. ```yaml # Directory where files can be downloaded for compaction. +# CLI flag: -boltdb.shipper.compactor.working-directory [working_directory: ] # The shared store used for storing boltdb files. # Supported types: gcs, s3, azure, swift, filesystem. +# CLI flag: -boltdb.shipper.compactor.shared-store [shared_store: ] # Prefix to add to object keys in shared store. # Path separator(if any) should always be a '/'. # Prefix should never start with a separator but should always end with it. +# CLI flag: -boltdb.shipper.compactor.shared-store.key-prefix [shared_store_key_prefix: | default = "index/"] # Interval at which to re-run the compaction operation (or retention if enabled). +# CLI flag: -boltdb.shipper.compactor.compaction-interval [compaction_interval: | default = 10m] # (Experimental) Activate custom (per-stream,per-tenant) retention. +# CLI flag: -boltdb.shipper.compactor.retention-enabled [retention_enabled: | default = false] # Delay after which chunks will be fully deleted during retention. +# CLI flag: -boltdb.shipper.compactor.retention-delete-delay [retention_delete_delay: | default = 2h] # The total amount of worker to use to delete chunks. +# CLI flag: -boltdb.shipper.compactor.retention-delete-worker-count [retention_delete_worker_count: | default = 150] # Allow cancellation of delete request until duration after they are created. # Data would be deleted only after delete requests have been older than this duration. # Ideally this should be set to at least 24h. +# CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period [delete_request_cancel_period: | default = 24h] + +# Maximum number of tables to compact in parallel. +# While increasing this value, please make sure compactor has enough disk space +# allocated to be able to store and compact as many tables. +# CLI flag: -boltdb.shipper.compactor.max-compaction-parallelism +[max_compaction_parallelism: | default = 1] + +# The hash ring configuration used by compactors to elect a single instance for running compactions +compactor_ring: + # The key-value store used to share the hash ring across multiple instances. + kvstore: + # Backend storage to use for the ring. Supported values are: consul, etcd, + # inmemory, memberlist, multi. + # CLI flag: -boltdb.shipper.compactor.ring.store + [store: | default = "memberlist"] + + # The prefix for the keys in the store. Should end with a /. + # CLI flag: -boltdb.shipper.compactor.ring.prefix + [prefix: | default = "compactors/"] + + # The consul_config configures the consul client. + # The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring + [consul: ] + + # The etcd_config configures the etcd client. + # The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring + [etcd: ] + + multi: + # Primary backend storage used by multi-client. + # CLI flag: -boltdb.shipper.compactor.ring.multi.primary + [primary: | default = ""] + + # Secondary backend storage used by multi-client. + # CLI flag: -boltdb.shipper.compactor.ring.multi.secondary + [secondary: | default = ""] + + # Mirror writes to secondary store. + # CLI flag: -boltdb.shipper.compactor.ring.multi.mirror-enabled + [mirror_enabled: | default = false] + + # Timeout for storing value to secondary store. + # CLI flag: -boltdb.shipper.compactor.ring.multi.mirror-timeout + [mirror_timeout: | default = 2s] + + # Interval between heartbeats sent to the ring. 0 = disabled. + # CLI flag: -boltdb.shipper.compactor.ring.heartbeat-period + [heartbeat_period: | default = 15s] + + # The heartbeat timeout after which store gateways are considered unhealthy + # within the ring. 0 = never (timeout disabled). This option needs be set both + # on the store-gateway and querier when running in microservices mode. + # CLI flag: -boltdb.shipper.compactor.ring.heartbeat-timeout + [heartbeat_timeout: | default = 1m] + + # File path where tokens are stored. If empty, tokens are neither stored at + # shutdown nor restored at startup. + # CLI flag: -boltdb.shipper.compactor.ring.tokens-file-path + [tokens_file_path: | default = ""] + + # True to enable zone-awareness and replicate blocks across different + # availability zones. + # CLI flag: -boltdb.shipper.compactor.ring.zone-awareness-enabled + [zone_awareness_enabled: | default = false] + + # Name of network interface to read addresses from. + # CLI flag: -boltdb.shipper.compactor.ring.instance-interface-names + [instance_interface_names: | default = [eth0 en0]] + + # IP address to advertise in the ring. + # CLI flag: -boltdb.shipper.compactor.ring.instance-addr + [instance_addr: | default = first from instance_interface_names] + + # Port to advertise in the ring + # CLI flag: -boltdb.shipper.compactor.ring.instance-port + [instance_port: | default = server.grpc-listen-port] + + # Instance ID to register in the ring. + # CLI flag: -boltdb.shipper.compactor.ring.instance-id + [instance_id: | default = os.Hostname()] + + # The availability zone where this instance is running. Required if + # zone-awareness is enabled. + # CLI flag: -boltdb.shipper.compactor.ring.instance-availability-zone + [instance_availability_zone: | default = ""] ``` ## limits_config @@ -2317,6 +2410,10 @@ This way, one doesn't have to replicate configs in multiple places. # When defined, the given prefix will be present in front of the endpoint paths. [path_prefix: ] + +# When true, the ingester, compactor and query_scheduler ring tokens will be saved to files in the path_prefix directory +# Loki will error if you set this to true and path_prefix is empty. +[persist_tokens: : default = false] ``` ### common_storage_config diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index d85d01d0cbda..f791cac2fe24 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -12,8 +12,9 @@ import ( // Config holds common config that can be shared between multiple other config sections type Config struct { - PathPrefix string `yaml:"path_prefix"` - Storage Storage `yaml:"storage"` + PathPrefix string `yaml:"path_prefix"` + Storage Storage `yaml:"storage"` + PersistTokens bool `yaml:"persist_tokens"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index c64d78cb1a50..c6519a59d79d 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -77,7 +77,9 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { } applyPathPrefixDefaults(r, defaults) - applyIngesterRingConfig(r, &defaults) + if err := applyIngesterRingConfig(r, &defaults); err != nil { + return err + } applyMemberlistConfig(r) if err := applyStorageConfig(r, &defaults); err != nil { @@ -101,7 +103,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { // // If the ingester ring has its interface names sets to a value equal to the default (["eth0", en0"]), it will try to append // the loopback interface at the end of it. -func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) { +func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error { if reflect.DeepEqual(r.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) { appendLoopbackInterface(r) } @@ -111,6 +113,12 @@ func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) { s := rc.KVStore.Store sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig + f, err := tokensFile(r, "ingester.tokens") + if err != nil { + return err + } + r.Ingester.LifecyclerConfig.TokensFilePath = f + // This gets ugly because we use a separate struct for configuring each ring... // Distributor @@ -143,9 +151,46 @@ func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) { r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = lc.InfNames r.QueryScheduler.SchedulerRing.InstanceZone = lc.Zone r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.QueryScheduler.SchedulerRing.TokensFilePath = lc.TokensFilePath r.QueryScheduler.SchedulerRing.KVStore.Store = s r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc + f, err = tokensFile(r, "scheduler.tokens") + if err != nil { + return err + } + r.QueryScheduler.SchedulerRing.TokensFilePath = f + + // Compactor + r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.CompactorConfig.CompactorRing.HeartbeatPeriod = lc.HeartbeatPeriod + r.CompactorConfig.CompactorRing.InstancePort = lc.Port + r.CompactorConfig.CompactorRing.InstanceAddr = lc.Addr + r.CompactorConfig.CompactorRing.InstanceID = lc.ID + r.CompactorConfig.CompactorRing.InstanceInterfaceNames = lc.InfNames + r.CompactorConfig.CompactorRing.InstanceZone = lc.Zone + r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.CompactorConfig.CompactorRing.KVStore.Store = s + r.CompactorConfig.CompactorRing.KVStore.StoreConfig = sc + f, err = tokensFile(r, "compactor.tokens") + if err != nil { + return err + } + r.CompactorConfig.CompactorRing.TokensFilePath = f + return nil +} + +// tokensFile will create a tokens file with the provided name in the common config /tokens directory +// if and only if: +// * the common config persist_tokens == true +// * the common config path_prefix is defined. +func tokensFile(cfg *ConfigWrapper, file string) (string, error) { + if !cfg.Common.PersistTokens { + return "", nil + } + if cfg.Common.PathPrefix == "" { + return "", errors.New("if persist_tokens is true, path_prefix MUST be defined") + } + + return cfg.Common.PathPrefix + "/" + file, nil } func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) { @@ -184,6 +229,7 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.Distributor.DistributorRing.KVStore.Store = memberlistStr r.Ruler.Ring.KVStore.Store = memberlistStr r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr + r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr } } diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 2dad9c47d5c4..0a103a91c5e3 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -20,7 +20,9 @@ import ( cortex_local "github.com/cortexproject/cortex/pkg/ruler/rulestore/local" cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift" + "github.com/grafana/loki/pkg/loki/common" "github.com/grafana/loki/pkg/storage/chunk/storage" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/cfg" loki_net "github.com/grafana/loki/pkg/util/net" ) @@ -910,9 +912,62 @@ func TestDefaultUnmarshal(t *testing.T) { func Test_applyIngesterRingConfig(t *testing.T) { - msgf := "%s has changed, this is a crude attempt to catch mapping errors missed in config_wrapper.applyIngesterRingConfig when a ring config changes. Please add a new mapping and update the expected value in this test." + t.Run("Attempt to catch changes to a RingConfig", func(t *testing.T) { + msgf := "%s has changed, this is a crude attempt to catch mapping errors missed in config_wrapper.applyIngesterRingConfig when a ring config changes. Please add a new mapping and update the expected value in this test." - assert.Equal(t, 8, reflect.TypeOf(distributor.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String())) + assert.Equal(t, 8, reflect.TypeOf(distributor.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String())) + assert.Equal(t, 12, reflect.TypeOf(util.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(util.RingConfig{}).String())) + }) + + t.Run("compactor and scheduler tokens file should not be configured if persist_tokens is false", func(t *testing.T) { + yamlContent := ` +common: + path_prefix: /loki +` + config, _, err := configWrapperFromYAML(t, yamlContent, []string{}) + assert.NoError(t, err) + + assert.Equal(t, "", config.Ingester.LifecyclerConfig.TokensFilePath) + assert.Equal(t, "", config.CompactorConfig.CompactorRing.TokensFilePath) + assert.Equal(t, "", config.QueryScheduler.SchedulerRing.TokensFilePath) + }) + + t.Run("tokens files should be set from common config when persist_tokens is true and path_prefix is defined", func(t *testing.T) { + yamlContent := ` +common: + persist_tokens: true + path_prefix: /loki +` + config, _, err := configWrapperFromYAML(t, yamlContent, []string{}) + assert.NoError(t, err) + + assert.Equal(t, "/loki/ingester.tokens", config.Ingester.LifecyclerConfig.TokensFilePath) + assert.Equal(t, "/loki/compactor.tokens", config.CompactorConfig.CompactorRing.TokensFilePath) + assert.Equal(t, "/loki/scheduler.tokens", config.QueryScheduler.SchedulerRing.TokensFilePath) + }) + + t.Run("common config ignored if actual values set", func(t *testing.T) { + yamlContent := ` +ingester: + lifecycler: + tokens_file_path: /loki/toookens +compactor: + compactor_ring: + tokens_file_path: /foo/tokens +query_scheduler: + scheduler_ring: + tokens_file_path: /sched/tokes +common: + persist_tokens: true + path_prefix: /loki +` + config, _, err := configWrapperFromYAML(t, yamlContent, []string{}) + assert.NoError(t, err) + + assert.Equal(t, "/loki/toookens", config.Ingester.LifecyclerConfig.TokensFilePath) + assert.Equal(t, "/foo/tokens", config.CompactorConfig.CompactorRing.TokensFilePath) + assert.Equal(t, "/sched/tokes", config.QueryScheduler.SchedulerRing.TokensFilePath) + }) } @@ -1015,3 +1070,29 @@ func TestLoopbackAppendingToFrontendV2(t *testing.T) { assert.Equal(t, []string{"otheriface"}, config.Frontend.FrontendV2.InfNames) }) } + +func Test_tokensFile(t *testing.T) { + tests := []struct { + name string + cfg *ConfigWrapper + file string + want string + wantErr bool + }{ + {"persist_tokens false, path_prefix empty", &ConfigWrapper{Config: Config{Common: common.Config{PathPrefix: "", PersistTokens: false}}}, "ingester.tokens", "", false}, + {"persist_tokens true, path_prefix empty", &ConfigWrapper{Config: Config{Common: common.Config{PathPrefix: "", PersistTokens: true}}}, "ingester.tokens", "", true}, + {"persist_tokens true, path_prefix set", &ConfigWrapper{Config: Config{Common: common.Config{PathPrefix: "/loki", PersistTokens: true}}}, "ingester.tokens", "/loki/ingester.tokens", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tokensFile(tt.cfg, tt.file) + if (err != nil) != tt.wantErr { + t.Errorf("tokensFile() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("tokensFile() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 6b360962f191..abba9c7c713c 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -453,8 +453,8 @@ func (t *Loki) setupModuleManager() error { Compactor: {Server, Overrides}, IndexGateway: {Server}, IngesterQuerier: {Ring}, - All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler}, - Read: {QueryScheduler, QueryFrontend, Querier, Ruler}, + All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, + Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor}, Write: {Ingester, Distributor}, } @@ -463,12 +463,6 @@ func (t *Loki) setupModuleManager() error { deps[Store] = append(deps[Store], IngesterQuerier) } - // If we are running Loki with boltdb-shipper as a single binary, without clustered mode(which should always be the case when using inmemory ring), - // we should start compactor as well for better user experience. - if storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Store == "inmemory" { - deps[All] = append(deps[All], Compactor) - } - // If the query scheduler and querier are running together, make sure the scheduler goes // first to initialize the ring that will also be used by the querier if (t.Cfg.isModuleEnabled(Querier) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(All) { diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index d8cf36ea2387..b836f6de75b2 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -652,6 +652,15 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { } func (t *Loki) initCompactor() (services.Service, error) { + // Set some config sections from other config sections in the config struct + t.Cfg.CompactorConfig.CompactorRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.CompactorConfig.CompactorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + + if !loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { + level.Info(util_log.Logger).Log("msg", "Not using boltdb-shipper index, not starting compactor") + return nil, nil + } + err := t.Cfg.SchemaConfig.Load() if err != nil { return nil, err @@ -661,6 +670,7 @@ func (t *Loki) initCompactor() (services.Service, error) { return nil, err } + t.Server.HTTP.Handle("/compactor/ring", t.compactor) if t.Cfg.CompactorConfig.RetentionEnabled { t.Server.HTTP.Path("/loki/api/admin/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) t.Server.HTTP.Path("/loki/api/admin/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3de20ecbe5e3..7f05131204a8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -30,6 +30,7 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc" + lokiutil "github.com/grafana/loki/pkg/util" lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" ) @@ -41,6 +42,19 @@ const ( // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance // in the ring will be automatically removed. ringAutoForgetUnhealthyPeriods = 10 + + // ringKey is the key under which we store the store gateways ring in the KVStore. + ringKey = "scheduler" + + // ringNameForServer is the name of the ring used by the compactor server. + ringNameForServer = "scheduler" + + // ringReplicationFactor should be 2 because we want 2 schedulers. + ringReplicationFactor = 2 + + // ringNumTokens sets our single token in the ring, + // we only need to insert 1 token to be used for leader election purposes. + ringNumTokens = 1 ) // Scheduler is responsible for queueing and dispatching queries to Queriers. @@ -96,8 +110,8 @@ type Config struct { QuerierForgetDelay time.Duration `yaml:"-"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."` // Schedulers ring - UseSchedulerRing bool `yaml:"use_scheduler_ring"` - SchedulerRing RingConfig `yaml:"scheduler_ring,omitempty"` + UseSchedulerRing bool `yaml:"use_scheduler_ring"` + SchedulerRing lokiutil.RingConfig `yaml:"scheduler_ring,omitempty"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -107,7 +121,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.QuerierForgetDelay = 0 cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) f.BoolVar(&cfg.UseSchedulerRing, "query-scheduler.use-scheduler-ring", false, "Set to true to have the query scheduler create a ring and the frontend and frontend_worker use this ring to get the addresses of the query schedulers. If frontend_address and scheduler_address are not present in the config this value will be toggle by Loki to true") - cfg.SchedulerRing.RegisterFlags(f) + cfg.SchedulerRing.RegisterFlagsWithPrefix("query-scheduler.", "schedulers/", f) } // NewScheduler creates a new Scheduler. @@ -160,7 +174,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe if err != nil { return nil, errors.Wrap(err, "create KV store client") } - lifecyclerCfg, err := cfg.SchedulerRing.ToLifecyclerConfig() + lifecyclerCfg, err := cfg.SchedulerRing.ToLifecyclerConfig(ringNumTokens) if err != nil { return nil, errors.Wrap(err, "invalid ring lifecycler config") } @@ -172,13 +186,13 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe delegate = ring.NewTokensPersistencyDelegate(cfg.SchedulerRing.TokensFilePath, ring.JOINING, delegate, log) delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.SchedulerRing.HeartbeatTimeout, delegate, log) - s.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, RingNameForServer, RingKey, ringStore, delegate, log, registerer) + s.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, ringKey, ringStore, delegate, log, registerer) if err != nil { return nil, errors.Wrap(err, "create ring lifecycler") } - ringCfg := cfg.SchedulerRing.ToRingConfig() - s.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, RingNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) + ringCfg := cfg.SchedulerRing.ToRingConfig(ringReplicationFactor) + s.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, ringKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) if err != nil { return nil, errors.Wrap(err, "create ring client") } @@ -656,7 +670,7 @@ func (s *Scheduler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc rin } takenTokens := ringDesc.GetTokens() - newTokens := ring.GenerateTokens(RingNumTokens-len(tokens), takenTokens) + newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens) // Tokens sorting will be enforced by the parent caller. tokens = append(tokens, newTokens...) diff --git a/pkg/scheduler/scheduler_ring.go b/pkg/scheduler/scheduler_ring.go deleted file mode 100644 index 8f41a6138d46..000000000000 --- a/pkg/scheduler/scheduler_ring.go +++ /dev/null @@ -1,110 +0,0 @@ -package scheduler - -import ( - "flag" - "fmt" - "os" - "time" - - "github.com/go-kit/log/level" - "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - - "github.com/cortexproject/cortex/pkg/ring" - util_log "github.com/cortexproject/cortex/pkg/util/log" -) - -const ( - // RingKey is the key under which we store the store gateways ring in the KVStore. - RingKey = "scheduler" - - // RingNameForServer is the name of the ring used by the store gateway server. - RingNameForServer = "scheduler" - - // RingNameForClient is the name of the ring used by the store gateway client (we need - // a different name to avoid clashing Prometheus metrics when running in single-binary). - RingNameForClient = "scheduler-client" - - // We use a safe default instead of exposing to config option to the user - // in order to simplify the config. - RingNumTokens = 512 -) - -// RingConfig masks the ring lifecycler config which contains -// many options not really required by the distributors ring. This config -// is used to strip down the config to the minimum, and avoid confusion -// to the user. -type RingConfig struct { - KVStore kv.Config `yaml:"kvstore"` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` - TokensFilePath string `yaml:"tokens_file_path"` - ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` - - // Instance details - InstanceID string `yaml:"instance_id"` - InstanceInterfaceNames []string `yaml:"instance_interface_names"` - InstancePort int `yaml:"instance_port"` - InstanceAddr string `yaml:"instance_addr"` - InstanceZone string `yaml:"instance_availability_zone"` - - // Injected internally - ListenPort int `yaml:"-"` - - ObservePeriod time.Duration `yaml:"-"` -} - -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { - hostname, err := os.Hostname() - if err != nil { - level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err) - os.Exit(1) - } - - // Ring flags - cfg.KVStore.RegisterFlagsWithPrefix("scheduler.ring.", "schedulers/", f) - f.DurationVar(&cfg.HeartbeatPeriod, "scheduler.ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") - f.DurationVar(&cfg.HeartbeatTimeout, "scheduler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which schedulers are considered unhealthy within the ring. 0 = never (timeout disabled).") - f.StringVar(&cfg.TokensFilePath, "scheduler.ring.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") - f.BoolVar(&cfg.ZoneAwarenessEnabled, "scheduler.ring.zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.") - - // Instance flags - cfg.InstanceInterfaceNames = []string{"eth0", "en0"} - f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "scheduler.ring.instance-interface-names", "Name of network interface to read address from.") - f.StringVar(&cfg.InstanceAddr, "scheduler.ring.instance-addr", "", "IP address to advertise in the ring.") - f.IntVar(&cfg.InstancePort, "scheduler.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") - f.StringVar(&cfg.InstanceID, "scheduler.ring.instance-id", hostname, "Instance ID to register in the ring.") - f.StringVar(&cfg.InstanceZone, "scheduler.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") -} - -// ToLifecyclerConfig returns a LifecyclerConfig based on the scheduler ring config. -func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) { - instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames) - if err != nil { - return ring.BasicLifecyclerConfig{}, err - } - - instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) - - return ring.BasicLifecyclerConfig{ - ID: cfg.InstanceID, - Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), - Zone: cfg.InstanceZone, - HeartbeatPeriod: cfg.HeartbeatPeriod, - TokensObservePeriod: 0, - NumTokens: RingNumTokens, - }, nil -} - -func (cfg *RingConfig) ToRingConfig() ring.Config { - rc := ring.Config{} - flagext.DefaultValues(&rc) - - rc.KVStore = cfg.KVStore - rc.HeartbeatTimeout = cfg.HeartbeatTimeout - rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled - rc.ReplicationFactor = 2 - - return rc -} diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 899c4657f896..81f57ca06b20 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -2,16 +2,19 @@ package compactor import ( "context" - "errors" "flag" + "fmt" + "net/http" "path/filepath" - "reflect" "sync" "time" + "github.com/cortexproject/cortex/pkg/ring" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -24,18 +27,42 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/util" +) + +const ( + // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance + // in the ring will be automatically removed. + ringAutoForgetUnhealthyPeriods = 10 + + // ringKey is the key under which we store the store gateways ring in the KVStore. + ringKey = "compactor" + + // ringNameForServer is the name of the ring used by the compactor server. + ringNameForServer = "compactor" + + // ringKeyOfLeader is a somewhat arbitrary ID to pull from the ring to see who will be elected the leader + ringKeyOfLeader = 0 + + // ringReplicationFactor should be 1 because we only want to pull back one node from the Ring + ringReplicationFactor = 1 + + // ringNumTokens sets our single token in the ring, + // we only need to insert 1 token to be used for leader election purposes. + ringNumTokens = 1 ) type Config struct { - WorkingDirectory string `yaml:"working_directory"` - SharedStoreType string `yaml:"shared_store"` - SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` - CompactionInterval time.Duration `yaml:"compaction_interval"` - RetentionEnabled bool `yaml:"retention_enabled"` - RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` - RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` - DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` - MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` + WorkingDirectory string `yaml:"working_directory"` + SharedStoreType string `yaml:"shared_store"` + SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + RetentionEnabled bool `yaml:"retention_enabled"` + RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` + RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` + DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` + MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` + CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"` } // RegisterFlags registers flags. @@ -49,14 +76,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") + cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "compactors/", f) } -func (cfg *Config) IsDefaults() bool { - cpy := &Config{} - cpy.RegisterFlags(flag.NewFlagSet("defaults", flag.ContinueOnError)) - return reflect.DeepEqual(cfg, cpy) -} - +// Validate verifies the config does not contain inappropriate values func (cfg *Config) Validate() error { if cfg.MaxCompactionParallelism < 1 { return errors.New("max compaction parallelism must be >= 1") @@ -76,22 +99,77 @@ type Compactor struct { deleteRequestsManager *deletion.DeleteRequestsManager expirationChecker retention.ExpirationChecker metrics *metrics + running bool + wg sync.WaitGroup + + // Ring used for running a single compactor + ringLifecycler *ring.BasicLifecycler + ring *ring.Ring + ringPollPeriod time.Duration + + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher } func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) { - if cfg.IsDefaults() { - return nil, errors.New("Must specify compactor config") + if cfg.SharedStoreType == "" { + return nil, errors.New("compactor shared_store_type must be specified") } compactor := &Compactor{ - cfg: cfg, + cfg: cfg, + ringPollPeriod: 5 * time.Second, + } + + ringStore, err := kv.NewClient( + cfg.CompactorRing.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", r), "compactor"), + util_log.Logger, + ) + if err != nil { + return nil, errors.Wrap(err, "create KV store client") + } + lifecyclerCfg, err := cfg.CompactorRing.ToLifecyclerConfig(ringNumTokens) + if err != nil { + return nil, errors.Wrap(err, "invalid ring lifecycler config") + } + + // Define lifecycler delegates in reverse order (last to be called defined first because they're + // chained via "next delegate"). + delegate := ring.BasicLifecyclerDelegate(compactor) + delegate = ring.NewLeaveOnStoppingDelegate(delegate, util_log.Logger) + delegate = ring.NewTokensPersistencyDelegate(cfg.CompactorRing.TokensFilePath, ring.JOINING, delegate, util_log.Logger) + delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.CompactorRing.HeartbeatTimeout, delegate, util_log.Logger) + + compactor.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, ringKey, ringStore, delegate, util_log.Logger, r) + if err != nil { + return nil, errors.Wrap(err, "create ring lifecycler") } + ringCfg := cfg.CompactorRing.ToRingConfig(ringReplicationFactor) + compactor.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, ringKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) + if err != nil { + return nil, errors.Wrap(err, "create ring client") + } + + if r != nil { + r.MustRegister(compactor.ring) + } + + compactor.subservices, err = services.NewManager(compactor.ringLifecycler, compactor.ring) + if err != nil { + return nil, err + } + compactor.subservicesWatcher = services.NewFailureWatcher() + compactor.subservicesWatcher.WatchManager(compactor.subservices) + if err := compactor.init(storageConfig, schemaConfig, limits, r); err != nil { return nil, err } - compactor.Service = services.NewBasicService(nil, compactor.loop, nil) + compactor.Service = services.NewBasicService(compactor.starting, compactor.loop, compactor.stopping) return compactor, nil } @@ -143,22 +221,136 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage return nil } +func (c *Compactor) starting(ctx context.Context) (err error) { + // In case this function will return error we want to unregister the instance + // from the ring. We do it ensuring dependencies are gracefully stopped if they + // were already started. + defer func() { + if err == nil || c.subservices == nil { + return + } + + if stopErr := services.StopManagerAndAwaitStopped(context.Background(), c.subservices); stopErr != nil { + level.Error(util_log.Logger).Log("msg", "failed to gracefully stop compactor dependencies", "err", stopErr) + } + }() + + if err := services.StartManagerAndAwaitHealthy(ctx, c.subservices); err != nil { + return errors.Wrap(err, "unable to start compactor subservices") + } + + // The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that + // someone wants to do can be done before becoming ACTIVE. For the query compactor we don't currently + // have any additional work so we can become ACTIVE right away. + + // Wait until the ring client detected this instance in the JOINING state to + // make sure that when we'll run the initial sync we already know the tokens + // assigned to this instance. + level.Info(util_log.Logger).Log("msg", "waiting until compactor is JOINING in the ring") + if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { + return err + } + level.Info(util_log.Logger).Log("msg", "compactor is JOINING in the ring") + + // Change ring state to ACTIVE + if err = c.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { + return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) + } + + // Wait until the ring client detected this instance in the ACTIVE state to + // make sure that when we'll run the loop it won't be detected as a ring + // topology change. + level.Info(util_log.Logger).Log("msg", "waiting until compactor is ACTIVE in the ring") + if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + return err + } + level.Info(util_log.Logger).Log("msg", "compactor is ACTIVE in the ring") + + return nil +} + func (c *Compactor) loop(ctx context.Context) error { if c.cfg.RetentionEnabled { defer c.deleteRequestsStore.Stop() defer c.deleteRequestsManager.Stop() } + syncTicker := time.NewTicker(c.ringPollPeriod) + defer syncTicker.Stop() + + var runningCtx context.Context + var runningCancel context.CancelFunc + + for { + select { + case <-ctx.Done(): + if runningCancel != nil { + runningCancel() + } + c.wg.Wait() + level.Info(util_log.Logger).Log("msg", "compactor exiting") + return nil + case <-syncTicker.C: + bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() + rs, err := c.ring.Get(ringKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error asking ring for who should run the compactor, will check again", "err", err) + continue + } + + addrs := rs.GetAddresses() + if len(addrs) != 1 { + level.Error(util_log.Logger).Log("msg", "too many addresses (more that one) return when asking the ring who should run the compactor, will check again") + continue + } + if c.ringLifecycler.GetInstanceAddr() == addrs[0] { + // If not running, start + if !c.running { + level.Info(util_log.Logger).Log("msg", "this instance has been chosen to run the compactor, starting compactor") + runningCtx, runningCancel = context.WithCancel(ctx) + go c.runCompactions(runningCtx) + c.running = true + c.metrics.compactorRunning.Set(1) + } + } else { + // If running, shutdown + if c.running { + level.Info(util_log.Logger).Log("msg", "this instance should no longer run the compactor, stopping compactor") + runningCancel() + c.wg.Wait() + c.running = false + c.metrics.compactorRunning.Set(0) + level.Info(util_log.Logger).Log("msg", "compactor stopped") + } + } + } + } +} + +func (c *Compactor) runCompactions(ctx context.Context) { + // To avoid races, wait 1 compaction interval before actually starting the compactor + // this allows the ring to settle if there are a lot of ring changes and gives + // time for existing compactors to shutdown before this starts to avoid + // multiple compactors running at the same time. + t := time.NewTimer(c.cfg.CompactionInterval) + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("waiting %v for ring to stay stable and previous compactions to finish before starting compactor", c.cfg.CompactionInterval)) + select { + case <-ctx.Done(): + return + case <-t.C: + level.Info(util_log.Logger).Log("msg", "compactor startup delay completed") + break + } + runCompaction := func() { err := c.RunCompaction(ctx) if err != nil { level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) } } - var wg sync.WaitGroup - wg.Add(1) + c.wg.Add(1) go func() { - defer wg.Done() + defer c.wg.Done() runCompaction() ticker := time.NewTicker(c.cfg.CompactionInterval) @@ -174,20 +366,22 @@ func (c *Compactor) loop(ctx context.Context) error { } }() if c.cfg.RetentionEnabled { - wg.Add(1) + c.wg.Add(1) go func() { // starts the chunk sweeper defer func() { c.sweeper.Stop() - wg.Done() + c.wg.Done() }() c.sweeper.Start() <-ctx.Done() }() } + level.Info(util_log.Logger).Log("msg", "compactor started") +} - wg.Wait() - return nil +func (c *Compactor) stopping(_ error) error { + return services.StopManagerAndAwaitStopped(context.Background(), c.subservices) } func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { @@ -221,8 +415,9 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { defer func() { c.metrics.compactTablesOperationTotal.WithLabelValues(status).Inc() + runtime := time.Since(start) if status == statusSuccess { - c.metrics.compactTablesOperationDurationSeconds.Set(time.Since(start).Seconds()) + c.metrics.compactTablesOperationDurationSeconds.Set(runtime.Seconds()) c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime() } @@ -233,6 +428,9 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { c.expirationChecker.MarkPhaseFailed() } } + if runtime > c.cfg.CompactionInterval { + level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("last compaction took %s which is longer than the compaction interval of %s, this can lead to duplicate compactors running if not running a standalone compactor instance.", runtime, c.cfg.CompactionInterval)) + } }() tables, err := c.indexStorageClient.ListTables(ctx) @@ -340,3 +538,30 @@ func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bo func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime model.Time, now model.Time) bool { return e.retentionExpiryChecker.DropFromIndex(ref, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(ref, tableEndTime, now) } + +func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { + // When we initialize the compactor instance in the ring we want to start from + // a clean situation, so whatever is the state we set it JOINING, while we keep existing + // tokens (if any) or the ones loaded from file. + var tokens []uint32 + if instanceExists { + tokens = instanceDesc.GetTokens() + } + + takenTokens := ringDesc.GetTokens() + newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens) + + // Tokens sorting will be enforced by the parent caller. + tokens = append(tokens, newTokens...) + + return ring.JOINING, tokens +} + +func (c *Compactor) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} +func (c *Compactor) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} +func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { +} + +func (c *Compactor) ServeHTTP(w http.ResponseWriter, req *http.Request) { + c.ring.ServeHTTP(w, req) +} diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index e0cffec23a44..88208ce3b12a 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -2,16 +2,13 @@ package compactor import ( "context" - "fmt" "io/ioutil" "os" "path/filepath" "strings" "testing" - "time" "github.com/grafana/dskit/flagext" - "github.com/stretchr/testify/require" loki_storage "github.com/grafana/loki/pkg/storage" @@ -27,36 +24,14 @@ func setupTestCompactor(t *testing.T, tempDir string) *Compactor { cfg.SharedStoreType = "filesystem" cfg.RetentionEnabled = false + require.NoError(t, cfg.Validate()) + c, err := NewCompactor(cfg, storage.Config{FSConfig: local.FSConfig{Directory: tempDir}}, loki_storage.SchemaConfig{}, nil, nil) require.NoError(t, err) return c } -func TestIsDefaults(t *testing.T) { - for i, tc := range []struct { - in *Config - out bool - }{ - {&Config{ - WorkingDirectory: "/tmp", - }, false}, - {&Config{}, false}, - {&Config{ - SharedStoreKeyPrefix: "index/", - CompactionInterval: 10 * time.Minute, - RetentionDeleteDelay: 2 * time.Hour, - RetentionDeleteWorkCount: 150, - DeleteRequestCancelPeriod: 24 * time.Hour, - MaxCompactionParallelism: 1, - }, true}, - } { - t.Run(fmt.Sprint(i), func(t *testing.T) { - require.Equal(t, tc.out, tc.in.IsDefaults()) - }) - } -} - func TestCompactor_RunCompaction(t *testing.T) { tempDir, err := ioutil.TempDir("", "compactor-run-compaction") require.NoError(t, err) diff --git a/pkg/storage/stores/shipper/compactor/metrics.go b/pkg/storage/stores/shipper/compactor/metrics.go index fdb304b7897b..94d2f1ca2822 100644 --- a/pkg/storage/stores/shipper/compactor/metrics.go +++ b/pkg/storage/stores/shipper/compactor/metrics.go @@ -14,6 +14,7 @@ type metrics struct { compactTablesOperationTotal *prometheus.CounterVec compactTablesOperationDurationSeconds prometheus.Gauge compactTablesOperationLastSuccess prometheus.Gauge + compactorRunning prometheus.Gauge } func newMetrics(r prometheus.Registerer) *metrics { @@ -33,6 +34,11 @@ func newMetrics(r prometheus.Registerer) *metrics { Name: "compact_tables_operation_last_successful_run_timestamp_seconds", Help: "Unix timestamp of the last successful compaction run", }), + compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "compactor_running", + Help: "Value will be 1 if compactor is currently running on this instance", + }), } return &m diff --git a/pkg/util/ring_config.go b/pkg/util/ring_config.go new file mode 100644 index 000000000000..a0ae0e6e3e30 --- /dev/null +++ b/pkg/util/ring_config.go @@ -0,0 +1,95 @@ +package util + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/kv" + + "github.com/cortexproject/cortex/pkg/ring" + util_log "github.com/cortexproject/cortex/pkg/util/log" +) + +// RingConfig masks the ring lifecycler config which contains +// many options not really required by the distributors ring. This config +// is used to strip down the config to the minimum, and avoid confusion +// to the user. +type RingConfig struct { + KVStore kv.Config `yaml:"kvstore"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + TokensFilePath string `yaml:"tokens_file_path"` + ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` + + // Instance details + InstanceID string `yaml:"instance_id"` + InstanceInterfaceNames []string `yaml:"instance_interface_names"` + InstancePort int `yaml:"instance_port"` + InstanceAddr string `yaml:"instance_addr"` + InstanceZone string `yaml:"instance_availability_zone"` + + // Injected internally + ListenPort int `yaml:"-"` + + ObservePeriod time.Duration `yaml:"-"` +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +// storePrefix is used to set the path in the KVStore and should end with a / +func (cfg *RingConfig) RegisterFlagsWithPrefix(flagsPrefix, storePrefix string, f *flag.FlagSet) { + hostname, err := os.Hostname() + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err) + os.Exit(1) + } + + // Ring flags + cfg.KVStore.RegisterFlagsWithPrefix(flagsPrefix+"ring.", storePrefix, f) + f.DurationVar(&cfg.HeartbeatPeriod, flagsPrefix+"ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") + f.DurationVar(&cfg.HeartbeatTimeout, flagsPrefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).") + f.StringVar(&cfg.TokensFilePath, flagsPrefix+"ring.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") + f.BoolVar(&cfg.ZoneAwarenessEnabled, flagsPrefix+"ring.zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.") + + // Instance flags + cfg.InstanceInterfaceNames = []string{"eth0", "en0"} + f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), flagsPrefix+"ring.instance-interface-names", "Name of network interface to read address from.") + f.StringVar(&cfg.InstanceAddr, flagsPrefix+"ring.instance-addr", "", "IP address to advertise in the ring.") + f.IntVar(&cfg.InstancePort, flagsPrefix+"ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") + f.StringVar(&cfg.InstanceID, flagsPrefix+"ring.instance-id", hostname, "Instance ID to register in the ring.") + f.StringVar(&cfg.InstanceZone, flagsPrefix+"ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") +} + +// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor ring config. +func (cfg *RingConfig) ToLifecyclerConfig(numTokens int) (ring.BasicLifecyclerConfig, error) { + instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames) + if err != nil { + return ring.BasicLifecyclerConfig{}, err + } + + instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) + + return ring.BasicLifecyclerConfig{ + ID: cfg.InstanceID, + Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + Zone: cfg.InstanceZone, + HeartbeatPeriod: cfg.HeartbeatPeriod, + TokensObservePeriod: 0, + NumTokens: numTokens, + }, nil +} + +func (cfg *RingConfig) ToRingConfig(replicationFactor int) ring.Config { + rc := ring.Config{} + flagext.DefaultValues(&rc) + + rc.KVStore = cfg.KVStore + rc.HeartbeatTimeout = cfg.HeartbeatTimeout + rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled + rc.ReplicationFactor = replicationFactor + + return rc +} diff --git a/pkg/util/ring_watcher.go b/pkg/util/ring_watcher.go index ee9779ab6ad1..918239243f18 100644 --- a/pkg/util/ring_watcher.go +++ b/pkg/util/ring_watcher.go @@ -86,12 +86,12 @@ func (w *ringWatcher) lookupAddresses() { } for _, ta := range toAdd { - level.Debug(w.log).Log("msg", fmt.Sprintf("adding connection to scheduler at address: %s", ta)) + level.Debug(w.log).Log("msg", fmt.Sprintf("adding connection to address: %s", ta)) w.notifications.AddressAdded(ta) } for _, tr := range toRemove { - level.Debug(w.log).Log("msg", fmt.Sprintf("removing connection to scheduler at address: %s", tr)) + level.Debug(w.log).Log("msg", fmt.Sprintf("removing connection to address: %s", tr)) w.notifications.AddressRemoved(tr) } diff --git a/production/loki-mixin/alerts.libsonnet b/production/loki-mixin/alerts.libsonnet index bb9dab6eb1db..43d8d86bd330 100644 --- a/production/loki-mixin/alerts.libsonnet +++ b/production/loki-mixin/alerts.libsonnet @@ -51,6 +51,21 @@ |||, }, }, + { + alert: 'LokiTooManyCompactorsRunning', + expr: ||| + sum(loki_boltdb_shipper_compactor_running) by (namespace) > 1 + |||, + 'for': '5m', + labels: { + severity: 'warning', + }, + annotations: { + message: ||| + {{ $labels.namespace }} has had {{ printf "%.0f" $value }} compactors running for more than 5m. Only one compactor should run at a time. + |||, + }, + }, ], }, ],