From 426dbc0b61b8a5ec1ffed8ccd06d3a529e793993 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 27 Oct 2021 12:38:57 -0400 Subject: [PATCH 01/14] add a ring to the compactor --- docs/sources/configuration/_index.md | 93 +++++++++++ pkg/loki/config_wrapper.go | 14 ++ pkg/loki/modules.go | 5 + .../stores/shipper/compactor/compactor.go | 145 +++++++++++++++++- .../shipper/compactor/compactor_ring.go | 110 +++++++++++++ 5 files changed, 365 insertions(+), 2 deletions(-) create mode 100644 pkg/storage/stores/shipper/compactor/compactor_ring.go diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 6c142d77a2f1..3a5b9f6e0efc 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -1903,33 +1903,126 @@ compacts index shards to more performant forms. ```yaml # Directory where files can be downloaded for compaction. +# CLI flag: -boltdb.shipper.compactor.working-directory [working_directory: ] # The shared store used for storing boltdb files. # Supported types: gcs, s3, azure, swift, filesystem. +# CLI flag: -boltdb.shipper.compactor.shared-store [shared_store: ] # Prefix to add to object keys in shared store. # Path separator(if any) should always be a '/'. # Prefix should never start with a separator but should always end with it. +# CLI flag: -boltdb.shipper.compactor.shared-store.key-prefix [shared_store_key_prefix: | default = "index/"] # Interval at which to re-run the compaction operation (or retention if enabled). +# CLI flag: -boltdb.shipper.compactor.compaction-interval [compaction_interval: | default = 10m] # (Experimental) Activate custom (per-stream,per-tenant) retention. +# CLI flag: -boltdb.shipper.compactor.retention-enabled [retention_enabled: | default = false] # Delay after which chunks will be fully deleted during retention. +# CLI flag: -boltdb.shipper.compactor.retention-delete-delay [retention_delete_delay: | default = 2h] # The total amount of worker to use to delete chunks. +# CLI flag: -boltdb.shipper.compactor.retention-delete-worker-count [retention_delete_worker_count: | default = 150] # Allow cancellation of delete request until duration after they are created. # Data would be deleted only after delete requests have been older than this duration. # Ideally this should be set to at least 24h. +# CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period [delete_request_cancel_period: | default = 24h] + +# Maximum number of tables to compact in parallel. +# While increasing this value, please make sure compactor has enough disk space +# allocated to be able to store and compact as many tables. +# CLI flag: -boltdb.shipper.compactor.max-compaction-parallelism +[max_compaction_parallelism: | default = 1] + +# The hash ring configuration used by compactors to elect a single instance for running compactions +compactor_ring: + # The key-value store used to share the hash ring across multiple instances. + kvstore: + # Backend storage to use for the ring. Supported values are: consul, etcd, + # inmemory, memberlist, multi. + # CLI flag: -boltdb.shipper.compactor.ring.store + [store: | default = "memberlist"] + + # The prefix for the keys in the store. Should end with a /. + # CLI flag: -boltdb.shipper.compactor.ring.prefix + [prefix: | default = "compactors/"] + + # The consul_config configures the consul client. + # The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring + [consul: ] + + # The etcd_config configures the etcd client. + # The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring + [etcd: ] + + multi: + # Primary backend storage used by multi-client. + # CLI flag: -boltdb.shipper.compactor.ring.multi.primary + [primary: | default = ""] + + # Secondary backend storage used by multi-client. + # CLI flag: -boltdb.shipper.compactor.ring.multi.secondary + [secondary: | default = ""] + + # Mirror writes to secondary store. + # CLI flag: -boltdb.shipper.compactor.ring.multi.mirror-enabled + [mirror_enabled: | default = false] + + # Timeout for storing value to secondary store. + # CLI flag: -boltdb.shipper.compactor.ring.multi.mirror-timeout + [mirror_timeout: | default = 2s] + + # Interval between heartbeats sent to the ring. 0 = disabled. + # CLI flag: -boltdb.shipper.compactor.ring.heartbeat-period + [heartbeat_period: | default = 15s] + + # The heartbeat timeout after which store gateways are considered unhealthy + # within the ring. 0 = never (timeout disabled). This option needs be set both + # on the store-gateway and querier when running in microservices mode. + # CLI flag: -boltdb.shipper.compactor.ring.heartbeat-timeout + [heartbeat_timeout: | default = 1m] + + # File path where tokens are stored. If empty, tokens are neither stored at + # shutdown nor restored at startup. + # CLI flag: -boltdb.shipper.compactor.ring.tokens-file-path + [tokens_file_path: | default = ""] + + # True to enable zone-awareness and replicate blocks across different + # availability zones. + # CLI flag: -boltdb.shipper.compactor.ring.zone-awareness-enabled + [zone_awareness_enabled: | default = false] + + # Name of network interface to read addresses from. + # CLI flag: -boltdb.shipper.compactor.ring.instance-interface-names + [instance_interface_names: | default = [eth0 en0]] + + # IP address to advertise in the ring. + # CLI flag: -boltdb.shipper.compactor.ring.instance-addr + [instance_addr: | default = first from instance_interface_names] + + # Port to advertise in the ring + # CLI flag: -boltdb.shipper.compactor.ring.instance-port + [instance_port: | default = server.grpc-listen-port] + + # Instance ID to register in the ring. + # CLI flag: -boltdb.shipper.compactor.ring.instance-id + [instance_id: | default = os.Hostname()] + + # The availability zone where this instance is running. Required if + # zone-awareness is enabled. + # CLI flag: -boltdb.shipper.compactor.ring.instance-availability-zone + [instance_availability_zone: | default = ""] ``` ## limits_config diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index c64d78cb1a50..78671ec54eae 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -146,6 +146,19 @@ func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) { r.QueryScheduler.SchedulerRing.TokensFilePath = lc.TokensFilePath r.QueryScheduler.SchedulerRing.KVStore.Store = s r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc + + // Compactor + r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout + r.CompactorConfig.CompactorRing.HeartbeatPeriod = lc.HeartbeatPeriod + r.CompactorConfig.CompactorRing.InstancePort = lc.Port + r.CompactorConfig.CompactorRing.InstanceAddr = lc.Addr + r.CompactorConfig.CompactorRing.InstanceID = lc.ID + r.CompactorConfig.CompactorRing.InstanceInterfaceNames = lc.InfNames + r.CompactorConfig.CompactorRing.InstanceZone = lc.Zone + r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.CompactorConfig.CompactorRing.TokensFilePath = lc.TokensFilePath + r.CompactorConfig.CompactorRing.KVStore.Store = s + r.CompactorConfig.CompactorRing.KVStore.StoreConfig = sc } func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) { @@ -184,6 +197,7 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.Distributor.DistributorRing.KVStore.Store = memberlistStr r.Ruler.Ring.KVStore.Store = memberlistStr r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr + r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr } } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index d8cf36ea2387..6c22057cb3f3 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -652,6 +652,10 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { } func (t *Loki) initCompactor() (services.Service, error) { + // Set some config sections from other config sections in the config struct + t.Cfg.CompactorConfig.CompactorRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.CompactorConfig.CompactorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + err := t.Cfg.SchemaConfig.Load() if err != nil { return nil, err @@ -661,6 +665,7 @@ func (t *Loki) initCompactor() (services.Service, error) { return nil, err } + t.Server.HTTP.Handle("/compactor/ring", t.compactor) if t.Cfg.CompactorConfig.RetentionEnabled { t.Server.HTTP.Path("/loki/api/admin/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))) t.Server.HTTP.Path("/loki/api/admin/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 899c4657f896..05d5bf7a1fee 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -2,16 +2,19 @@ package compactor import ( "context" - "errors" "flag" + "net/http" "path/filepath" "reflect" "sync" "time" + "github.com/cortexproject/cortex/pkg/ring" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -26,6 +29,12 @@ import ( shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) +const ( + // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance + // in the ring will be automatically removed. + ringAutoForgetUnhealthyPeriods = 10 +) + type Config struct { WorkingDirectory string `yaml:"working_directory"` SharedStoreType string `yaml:"shared_store"` @@ -36,6 +45,7 @@ type Config struct { RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` + CompactorRing RingConfig `yaml:"compactor_ring,omitempty"` } // RegisterFlags registers flags. @@ -49,6 +59,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") + cfg.CompactorRing.RegisterFlags(f) } func (cfg *Config) IsDefaults() bool { @@ -76,6 +87,14 @@ type Compactor struct { deleteRequestsManager *deletion.DeleteRequestsManager expirationChecker retention.ExpirationChecker metrics *metrics + + // Ring used for running a single compactor + ringLifecycler *ring.BasicLifecycler + ring *ring.Ring + + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher } func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) { @@ -87,11 +106,54 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st cfg: cfg, } + ringStore, err := kv.NewClient( + cfg.CompactorRing.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", r), "compactor"), + util_log.Logger, + ) + if err != nil { + return nil, errors.Wrap(err, "create KV store client") + } + lifecyclerCfg, err := cfg.CompactorRing.ToLifecyclerConfig() + if err != nil { + return nil, errors.Wrap(err, "invalid ring lifecycler config") + } + + // Define lifecycler delegates in reverse order (last to be called defined first because they're + // chained via "next delegate"). + delegate := ring.BasicLifecyclerDelegate(compactor) + delegate = ring.NewLeaveOnStoppingDelegate(delegate, util_log.Logger) + delegate = ring.NewTokensPersistencyDelegate(cfg.CompactorRing.TokensFilePath, ring.JOINING, delegate, util_log.Logger) + delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.CompactorRing.HeartbeatTimeout, delegate, util_log.Logger) + + compactor.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, RingNameForServer, RingKey, ringStore, delegate, util_log.Logger, r) + if err != nil { + return nil, errors.Wrap(err, "create ring lifecycler") + } + + ringCfg := cfg.CompactorRing.ToRingConfig() + compactor.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, RingNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) + if err != nil { + return nil, errors.Wrap(err, "create ring client") + } + + if r != nil { + r.MustRegister(compactor.ring) + } + + compactor.subservices, err = services.NewManager(compactor.ringLifecycler, compactor.ring) + if err != nil { + return nil, err + } + compactor.subservicesWatcher = services.NewFailureWatcher() + compactor.subservicesWatcher.WatchManager(compactor.subservices) + if err := compactor.init(storageConfig, schemaConfig, limits, r); err != nil { return nil, err } - compactor.Service = services.NewBasicService(nil, compactor.loop, nil) + compactor.Service = services.NewBasicService(compactor.starting, compactor.loop, compactor.stopping) return compactor, nil } @@ -143,6 +205,54 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage return nil } +func (c *Compactor) starting(ctx context.Context) (err error) { + // In case this function will return error we want to unregister the instance + // from the ring. We do it ensuring dependencies are gracefully stopped if they + // were already started. + defer func() { + if err == nil || c.subservices == nil { + return + } + + if stopErr := services.StopManagerAndAwaitStopped(context.Background(), c.subservices); stopErr != nil { + level.Error(util_log.Logger).Log("msg", "failed to gracefully stop compactor dependencies", "err", stopErr) + } + }() + + if err := services.StartManagerAndAwaitHealthy(ctx, c.subservices); err != nil { + return errors.Wrap(err, "unable to start compactor subservices") + } + + // The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that + // someone wants to do can be done before becoming ACTIVE. For the query compactor we don't currently + // have any additional work so we can become ACTIVE right away. + + // Wait until the ring client detected this instance in the JOINING state to + // make sure that when we'll run the initial sync we already know the tokens + // assigned to this instance. + level.Info(util_log.Logger).Log("msg", "waiting until compactor is JOINING in the ring") + if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { + return err + } + level.Info(util_log.Logger).Log("msg", "compactor is JOINING in the ring") + + // Change ring state to ACTIVE + if err = c.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { + return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) + } + + // Wait until the ring client detected this instance in the ACTIVE state to + // make sure that when we'll run the loop it won't be detected as a ring + // topology change. + level.Info(util_log.Logger).Log("msg", "waiting until scheduler is ACTIVE in the ring") + if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + return err + } + level.Info(util_log.Logger).Log("msg", "scheduler is ACTIVE in the ring") + + return nil +} + func (c *Compactor) loop(ctx context.Context) error { if c.cfg.RetentionEnabled { defer c.deleteRequestsStore.Stop() @@ -190,6 +300,10 @@ func (c *Compactor) loop(ctx context.Context) error { return nil } +func (c *Compactor) stopping(_ error) error { + return services.StopManagerAndAwaitStopped(context.Background(), c.subservices) +} + func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient, c.cfg.RetentionEnabled, c.tableMarker) if err != nil { @@ -340,3 +454,30 @@ func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bo func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime model.Time, now model.Time) bool { return e.retentionExpiryChecker.DropFromIndex(ref, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(ref, tableEndTime, now) } + +func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { + // When we initialize the compactor instance in the ring we want to start from + // a clean situation, so whatever is the state we set it JOINING, while we keep existing + // tokens (if any) or the ones loaded from file. + var tokens []uint32 + if instanceExists { + tokens = instanceDesc.GetTokens() + } + + takenTokens := ringDesc.GetTokens() + newTokens := ring.GenerateTokens(RingNumTokens-len(tokens), takenTokens) + + // Tokens sorting will be enforced by the parent caller. + tokens = append(tokens, newTokens...) + + return ring.JOINING, tokens +} + +func (c *Compactor) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} +func (c *Compactor) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} +func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { +} + +func (c *Compactor) ServeHTTP(w http.ResponseWriter, req *http.Request) { + c.ring.ServeHTTP(w, req) +} diff --git a/pkg/storage/stores/shipper/compactor/compactor_ring.go b/pkg/storage/stores/shipper/compactor/compactor_ring.go new file mode 100644 index 000000000000..b7a163279b31 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/compactor_ring.go @@ -0,0 +1,110 @@ +package compactor + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/kv" + + "github.com/cortexproject/cortex/pkg/ring" + util_log "github.com/cortexproject/cortex/pkg/util/log" +) + +const ( + // RingKey is the key under which we store the store gateways ring in the KVStore. + RingKey = "compactor" + + // RingNameForServer is the name of the ring used by the store gateway server. + RingNameForServer = "compactor" + + // RingNameForClient is the name of the ring used by the store gateway client (we need + // a different name to avoid clashing Prometheus metrics when running in single-binary). + RingNameForClient = "compactor-client" + + // We use a safe default instead of exposing to config option to the user + // in order to simplify the config. + RingNumTokens = 512 +) + +// RingConfig masks the ring lifecycler config which contains +// many options not really required by the distributors ring. This config +// is used to strip down the config to the minimum, and avoid confusion +// to the user. +type RingConfig struct { + KVStore kv.Config `yaml:"kvstore"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + TokensFilePath string `yaml:"tokens_file_path"` + ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` + + // Instance details + InstanceID string `yaml:"instance_id"` + InstanceInterfaceNames []string `yaml:"instance_interface_names"` + InstancePort int `yaml:"instance_port"` + InstanceAddr string `yaml:"instance_addr"` + InstanceZone string `yaml:"instance_availability_zone"` + + // Injected internally + ListenPort int `yaml:"-"` + + ObservePeriod time.Duration `yaml:"-"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { + hostname, err := os.Hostname() + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err) + os.Exit(1) + } + + // Ring flags + cfg.KVStore.RegisterFlagsWithPrefix("boltdb.shipper.compactor.ring.", "compactors/", f) + f.DurationVar(&cfg.HeartbeatPeriod, "boltdb.shipper.compactor.ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") + f.DurationVar(&cfg.HeartbeatTimeout, "boltdb.shipper.compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).") + f.StringVar(&cfg.TokensFilePath, "boltdb.shipper.compactor.ring.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") + f.BoolVar(&cfg.ZoneAwarenessEnabled, "boltdb.shipper.compactor.ring.zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.") + + // Instance flags + cfg.InstanceInterfaceNames = []string{"eth0", "en0"} + f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "boltdb.shipper.compactor.ring.instance-interface-names", "Name of network interface to read address from.") + f.StringVar(&cfg.InstanceAddr, "boltdb.shipper.compactor.ring.instance-addr", "", "IP address to advertise in the ring.") + f.IntVar(&cfg.InstancePort, "boltdb.shipper.compactor.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") + f.StringVar(&cfg.InstanceID, "boltdb.shipper.compactor.ring.instance-id", hostname, "Instance ID to register in the ring.") + f.StringVar(&cfg.InstanceZone, "boltdb.shipper.compactor.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") +} + +// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor ring config. +func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) { + instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames) + if err != nil { + return ring.BasicLifecyclerConfig{}, err + } + + instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) + + return ring.BasicLifecyclerConfig{ + ID: cfg.InstanceID, + Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + Zone: cfg.InstanceZone, + HeartbeatPeriod: cfg.HeartbeatPeriod, + TokensObservePeriod: 0, + NumTokens: RingNumTokens, + }, nil +} + +func (cfg *RingConfig) ToRingConfig() ring.Config { + rc := ring.Config{} + flagext.DefaultValues(&rc) + + rc.KVStore = cfg.KVStore + rc.HeartbeatTimeout = cfg.HeartbeatTimeout + rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled + rc.ReplicationFactor = 2 + + return rc +} From b214951510343114a2849154e2a837593b653bc4 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 27 Oct 2021 18:01:07 -0400 Subject: [PATCH 02/14] wiring up the start/stop functionality --- pkg/loki/loki.go | 8 +- .../stores/shipper/compactor/compactor.go | 89 +++++++++++++++++-- .../shipper/compactor/compactor_ring.go | 2 +- 3 files changed, 82 insertions(+), 17 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 6b360962f191..8651110ef392 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -453,7 +453,7 @@ func (t *Loki) setupModuleManager() error { Compactor: {Server, Overrides}, IndexGateway: {Server}, IngesterQuerier: {Ring}, - All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler}, + All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, Read: {QueryScheduler, QueryFrontend, Querier, Ruler}, Write: {Ingester, Distributor}, } @@ -463,12 +463,6 @@ func (t *Loki) setupModuleManager() error { deps[Store] = append(deps[Store], IngesterQuerier) } - // If we are running Loki with boltdb-shipper as a single binary, without clustered mode(which should always be the case when using inmemory ring), - // we should start compactor as well for better user experience. - if storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Store == "inmemory" { - deps[All] = append(deps[All], Compactor) - } - // If the query scheduler and querier are running together, make sure the scheduler goes // first to initialize the ring that will also be used by the querier if (t.Cfg.isModuleEnabled(Querier) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(All) { diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 05d5bf7a1fee..430fff95a8d7 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -3,6 +3,7 @@ package compactor import ( "context" "flag" + "fmt" "net/http" "path/filepath" "reflect" @@ -33,6 +34,9 @@ const ( // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance // in the ring will be automatically removed. ringAutoForgetUnhealthyPeriods = 10 + + // RingKeyOfLeader is a somewhat arbitrary ID to pull from the ring to see who will be elected the leader + RingKeyOfLeader = 100 ) type Config struct { @@ -87,10 +91,13 @@ type Compactor struct { deleteRequestsManager *deletion.DeleteRequestsManager expirationChecker retention.ExpirationChecker metrics *metrics + running bool + wg sync.WaitGroup // Ring used for running a single compactor ringLifecycler *ring.BasicLifecycler ring *ring.Ring + ringPollPeriod time.Duration // Subservices manager. subservices *services.Manager @@ -103,7 +110,8 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st } compactor := &Compactor{ - cfg: cfg, + cfg: cfg, + ringPollPeriod: 5 * time.Second, } ringStore, err := kv.NewClient( @@ -259,16 +267,80 @@ func (c *Compactor) loop(ctx context.Context) error { defer c.deleteRequestsManager.Stop() } + syncTicker := time.NewTicker(c.ringPollPeriod) + defer syncTicker.Stop() + + var runningCtx context.Context + var runningCancel context.CancelFunc + + for { + select { + case <-ctx.Done(): + if runningCancel != nil { + runningCancel() + } + c.wg.Wait() + level.Info(util_log.Logger).Log("msg", "compactor exiting") + return nil + case <-syncTicker.C: + bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() + rs, err := c.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error asking ring for who should run the compactor, will check again", "err", err) + continue + } + + addrs := rs.GetAddresses() + if len(addrs) != 1 { + level.Error(util_log.Logger).Log("msg", "too many addresses (more that one) return when asking the ring who should run the compactor, will check again") + continue + } + if c.ringLifecycler.GetInstanceAddr() == addrs[0] { + // If not running, start + if !c.running { + level.Info(util_log.Logger).Log("msg", "this instance has been chosen to run the compactor, starting compactor") + runningCtx, runningCancel = context.WithCancel(ctx) + go c.runCompactions(runningCtx) + c.running = true + } + } else { + // If running, shutdown + if c.running { + level.Info(util_log.Logger).Log("msg", "this instance should no longer run the compactor, stopping compactor") + runningCancel() + c.wg.Wait() + c.running = false + level.Info(util_log.Logger).Log("msg", "compactor stopped") + } + } + } + } +} + +func (c *Compactor) runCompactions(ctx context.Context) { + // To avoid races, wait 1 compaction interval before actually starting the compactor + // this allows the ring to settle if there are a lot of ring changes and gives + // time for existing compactors to shutdown before this starts to avoid + // multiple compactors running at the same time. + t := time.NewTimer(c.cfg.CompactionInterval) + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("waiting %v for ring to stay stable and previous compactions to finish before starting compactor", c.cfg.CompactionInterval)) + select { + case <-ctx.Done(): + return + case <-t.C: + level.Info(util_log.Logger).Log("msg", "compactor startup delay completed, continuing to start compactor") + break + } + runCompaction := func() { err := c.RunCompaction(ctx) if err != nil { level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) } } - var wg sync.WaitGroup - wg.Add(1) + c.wg.Add(1) go func() { - defer wg.Done() + defer c.wg.Done() runCompaction() ticker := time.NewTicker(c.cfg.CompactionInterval) @@ -284,20 +356,19 @@ func (c *Compactor) loop(ctx context.Context) error { } }() if c.cfg.RetentionEnabled { - wg.Add(1) + c.wg.Add(1) go func() { // starts the chunk sweeper defer func() { c.sweeper.Stop() - wg.Done() + c.wg.Done() }() c.sweeper.Start() <-ctx.Done() }() } - - wg.Wait() - return nil + level.Info(util_log.Logger).Log("msg", "compactor started") + return } func (c *Compactor) stopping(_ error) error { diff --git a/pkg/storage/stores/shipper/compactor/compactor_ring.go b/pkg/storage/stores/shipper/compactor/compactor_ring.go index b7a163279b31..41a0436aaf31 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_ring.go +++ b/pkg/storage/stores/shipper/compactor/compactor_ring.go @@ -104,7 +104,7 @@ func (cfg *RingConfig) ToRingConfig() ring.Config { rc.KVStore = cfg.KVStore rc.HeartbeatTimeout = cfg.HeartbeatTimeout rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled - rc.ReplicationFactor = 2 + rc.ReplicationFactor = 1 return rc } From 50aa1c0d4801ebc6b04fc07ef5bb28c23ffb2a7f Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 28 Oct 2021 08:39:54 -0400 Subject: [PATCH 03/14] add some monitoring for multiple compactors running at the same time. --- pkg/storage/stores/shipper/compactor/compactor.go | 12 +++++++++--- pkg/storage/stores/shipper/compactor/metrics.go | 6 ++++++ production/loki-mixin/alerts.libsonnet | 15 +++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 430fff95a8d7..a2eca2139149 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -252,11 +252,11 @@ func (c *Compactor) starting(ctx context.Context) (err error) { // Wait until the ring client detected this instance in the ACTIVE state to // make sure that when we'll run the loop it won't be detected as a ring // topology change. - level.Info(util_log.Logger).Log("msg", "waiting until scheduler is ACTIVE in the ring") + level.Info(util_log.Logger).Log("msg", "waiting until compactor is ACTIVE in the ring") if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { return err } - level.Info(util_log.Logger).Log("msg", "scheduler is ACTIVE in the ring") + level.Info(util_log.Logger).Log("msg", "compactor is ACTIVE in the ring") return nil } @@ -302,6 +302,7 @@ func (c *Compactor) loop(ctx context.Context) error { runningCtx, runningCancel = context.WithCancel(ctx) go c.runCompactions(runningCtx) c.running = true + c.metrics.compactorRunning.Set(1) } } else { // If running, shutdown @@ -310,6 +311,7 @@ func (c *Compactor) loop(ctx context.Context) error { runningCancel() c.wg.Wait() c.running = false + c.metrics.compactorRunning.Set(0) level.Info(util_log.Logger).Log("msg", "compactor stopped") } } @@ -406,8 +408,9 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { defer func() { c.metrics.compactTablesOperationTotal.WithLabelValues(status).Inc() + runtime := time.Since(start) if status == statusSuccess { - c.metrics.compactTablesOperationDurationSeconds.Set(time.Since(start).Seconds()) + c.metrics.compactTablesOperationDurationSeconds.Set(runtime.Seconds()) c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime() } @@ -418,6 +421,9 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { c.expirationChecker.MarkPhaseFailed() } } + if runtime > c.cfg.CompactionInterval { + level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("last compaction took %s which is longer than the compaction interval of %s, this can lead to duplicate compactors running if not running a standalone compactor instance.", runtime, c.cfg.CompactionInterval)) + } }() tables, err := c.indexStorageClient.ListTables(ctx) diff --git a/pkg/storage/stores/shipper/compactor/metrics.go b/pkg/storage/stores/shipper/compactor/metrics.go index fdb304b7897b..94d2f1ca2822 100644 --- a/pkg/storage/stores/shipper/compactor/metrics.go +++ b/pkg/storage/stores/shipper/compactor/metrics.go @@ -14,6 +14,7 @@ type metrics struct { compactTablesOperationTotal *prometheus.CounterVec compactTablesOperationDurationSeconds prometheus.Gauge compactTablesOperationLastSuccess prometheus.Gauge + compactorRunning prometheus.Gauge } func newMetrics(r prometheus.Registerer) *metrics { @@ -33,6 +34,11 @@ func newMetrics(r prometheus.Registerer) *metrics { Name: "compact_tables_operation_last_successful_run_timestamp_seconds", Help: "Unix timestamp of the last successful compaction run", }), + compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "compactor_running", + Help: "Value will be 1 if compactor is currently running on this instance", + }), } return &m diff --git a/production/loki-mixin/alerts.libsonnet b/production/loki-mixin/alerts.libsonnet index bb9dab6eb1db..43d8d86bd330 100644 --- a/production/loki-mixin/alerts.libsonnet +++ b/production/loki-mixin/alerts.libsonnet @@ -51,6 +51,21 @@ |||, }, }, + { + alert: 'LokiTooManyCompactorsRunning', + expr: ||| + sum(loki_boltdb_shipper_compactor_running) by (namespace) > 1 + |||, + 'for': '5m', + labels: { + severity: 'warning', + }, + annotations: { + message: ||| + {{ $labels.namespace }} has had {{ printf "%.0f" $value }} compactors running for more than 5m. Only one compactor should run at a time. + |||, + }, + }, ], }, ], From dd3a297d285d8c9754075a06235e251d456b7ce5 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 28 Oct 2021 08:56:53 -0400 Subject: [PATCH 04/14] Add compactor to `read` target --- pkg/loki/loki.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 8651110ef392..abba9c7c713c 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -454,7 +454,7 @@ func (t *Loki) setupModuleManager() error { IndexGateway: {Server}, IngesterQuerier: {Ring}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, - Read: {QueryScheduler, QueryFrontend, Querier, Ruler}, + Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor}, Write: {Ingester, Distributor}, } From 3f1f43e7bc29396d5afb91b7b0f373e56c7f2afa Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 28 Oct 2021 10:40:07 -0400 Subject: [PATCH 05/14] fix tests/validation requirements --- pkg/loki/modules.go | 5 ++++ .../stores/shipper/compactor/compactor.go | 14 +++------ .../shipper/compactor/compactor_test.go | 29 ++----------------- 3 files changed, 11 insertions(+), 37 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 6c22057cb3f3..b836f6de75b2 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -656,6 +656,11 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Cfg.CompactorConfig.CompactorRing.ListenPort = t.Cfg.Server.GRPCListenPort t.Cfg.CompactorConfig.CompactorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + if !loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { + level.Info(util_log.Logger).Log("msg", "Not using boltdb-shipper index, not starting compactor") + return nil, nil + } + err := t.Cfg.SchemaConfig.Load() if err != nil { return nil, err diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index a2eca2139149..2da08add030a 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" "path/filepath" - "reflect" "sync" "time" @@ -66,12 +65,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.CompactorRing.RegisterFlags(f) } -func (cfg *Config) IsDefaults() bool { - cpy := &Config{} - cpy.RegisterFlags(flag.NewFlagSet("defaults", flag.ContinueOnError)) - return reflect.DeepEqual(cfg, cpy) -} - +// Validate verifies the config does not contain inappropriate values func (cfg *Config) Validate() error { if cfg.MaxCompactionParallelism < 1 { return errors.New("max compaction parallelism must be >= 1") @@ -105,8 +99,8 @@ type Compactor struct { } func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) { - if cfg.IsDefaults() { - return nil, errors.New("Must specify compactor config") + if cfg.SharedStoreType == "" { + return nil, errors.New("compactor shared_store_type must be specified") } compactor := &Compactor{ @@ -330,7 +324,7 @@ func (c *Compactor) runCompactions(ctx context.Context) { case <-ctx.Done(): return case <-t.C: - level.Info(util_log.Logger).Log("msg", "compactor startup delay completed, continuing to start compactor") + level.Info(util_log.Logger).Log("msg", "compactor startup delay completed") break } diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index e0cffec23a44..88208ce3b12a 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -2,16 +2,13 @@ package compactor import ( "context" - "fmt" "io/ioutil" "os" "path/filepath" "strings" "testing" - "time" "github.com/grafana/dskit/flagext" - "github.com/stretchr/testify/require" loki_storage "github.com/grafana/loki/pkg/storage" @@ -27,36 +24,14 @@ func setupTestCompactor(t *testing.T, tempDir string) *Compactor { cfg.SharedStoreType = "filesystem" cfg.RetentionEnabled = false + require.NoError(t, cfg.Validate()) + c, err := NewCompactor(cfg, storage.Config{FSConfig: local.FSConfig{Directory: tempDir}}, loki_storage.SchemaConfig{}, nil, nil) require.NoError(t, err) return c } -func TestIsDefaults(t *testing.T) { - for i, tc := range []struct { - in *Config - out bool - }{ - {&Config{ - WorkingDirectory: "/tmp", - }, false}, - {&Config{}, false}, - {&Config{ - SharedStoreKeyPrefix: "index/", - CompactionInterval: 10 * time.Minute, - RetentionDeleteDelay: 2 * time.Hour, - RetentionDeleteWorkCount: 150, - DeleteRequestCancelPeriod: 24 * time.Hour, - MaxCompactionParallelism: 1, - }, true}, - } { - t.Run(fmt.Sprint(i), func(t *testing.T) { - require.Equal(t, tc.out, tc.in.IsDefaults()) - }) - } -} - func TestCompactor_RunCompaction(t *testing.T) { tempDir, err := ioutil.TempDir("", "compactor-run-compaction") require.NoError(t, err) From 4270d000440c5f7da617aa2758dd6f7d306942f1 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Fri, 29 Oct 2021 15:51:07 -0400 Subject: [PATCH 06/14] Update pkg/storage/stores/shipper/compactor/compactor_ring.go Co-authored-by: Owen Diehl --- pkg/storage/stores/shipper/compactor/compactor_ring.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor_ring.go b/pkg/storage/stores/shipper/compactor/compactor_ring.go index 41a0436aaf31..ad96895c3268 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_ring.go +++ b/pkg/storage/stores/shipper/compactor/compactor_ring.go @@ -18,7 +18,7 @@ const ( // RingKey is the key under which we store the store gateways ring in the KVStore. RingKey = "compactor" - // RingNameForServer is the name of the ring used by the store gateway server. + // RingNameForServer is the name of the ring used by the compactor server. RingNameForServer = "compactor" // RingNameForClient is the name of the ring used by the store gateway client (we need From 0228c491de016a8f25ee459824f9956796688ca7 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Fri, 29 Oct 2021 15:51:20 -0400 Subject: [PATCH 07/14] Update pkg/storage/stores/shipper/compactor/compactor_ring.go Co-authored-by: Owen Diehl --- pkg/storage/stores/shipper/compactor/compactor_ring.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor_ring.go b/pkg/storage/stores/shipper/compactor/compactor_ring.go index ad96895c3268..859043c9f77b 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_ring.go +++ b/pkg/storage/stores/shipper/compactor/compactor_ring.go @@ -21,9 +21,6 @@ const ( // RingNameForServer is the name of the ring used by the compactor server. RingNameForServer = "compactor" - // RingNameForClient is the name of the ring used by the store gateway client (we need - // a different name to avoid clashing Prometheus metrics when running in single-binary). - RingNameForClient = "compactor-client" // We use a safe default instead of exposing to config option to the user // in order to simplify the config. From c6df173d02dc5f00fcb5d2a73922f05e43897675 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Fri, 29 Oct 2021 15:51:55 -0400 Subject: [PATCH 08/14] Update pkg/storage/stores/shipper/compactor/compactor_ring.go Co-authored-by: Owen Diehl --- pkg/storage/stores/shipper/compactor/compactor_ring.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor_ring.go b/pkg/storage/stores/shipper/compactor/compactor_ring.go index 859043c9f77b..5f0b0cf76f3f 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_ring.go +++ b/pkg/storage/stores/shipper/compactor/compactor_ring.go @@ -24,7 +24,7 @@ const ( // We use a safe default instead of exposing to config option to the user // in order to simplify the config. - RingNumTokens = 512 + RingNumTokens = 1 ) // RingConfig masks the ring lifecycler config which contains From 4f8f36fbea072873e84810f88b06c0b52797f191 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 31 Oct 2021 19:34:37 -0400 Subject: [PATCH 09/14] used a shared ring_config between scheduler and compactor to avoid some duplication --- pkg/scheduler/scheduler.go | 30 +++-- pkg/scheduler/scheduler_ring.go | 110 ------------------ .../stores/shipper/compactor/compactor.go | 52 ++++++--- .../shipper/compactor/compactor_ring.go | 107 ----------------- pkg/util/ring_config.go | 95 +++++++++++++++ 5 files changed, 150 insertions(+), 244 deletions(-) delete mode 100644 pkg/scheduler/scheduler_ring.go delete mode 100644 pkg/storage/stores/shipper/compactor/compactor_ring.go create mode 100644 pkg/util/ring_config.go diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3de20ecbe5e3..75999fdd5567 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -30,6 +30,7 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc" + lokiutil "github.com/grafana/loki/pkg/util" lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" ) @@ -41,6 +42,19 @@ const ( // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance // in the ring will be automatically removed. ringAutoForgetUnhealthyPeriods = 10 + + // ringKey is the key under which we store the store gateways ring in the KVStore. + ringKey = "scheduler" + + // ringNameForServer is the name of the ring used by the compactor server. + ringNameForServer = "scheduler" + + // ringReplicationFactor should be 1 because we only want to pull back one node from the Ring + ringReplicationFactor = 1 + + // ringNumTokens sets our single token in the ring, + // we only need to insert 1 token to be used for leader election purposes. + ringNumTokens = 1 ) // Scheduler is responsible for queueing and dispatching queries to Queriers. @@ -96,8 +110,8 @@ type Config struct { QuerierForgetDelay time.Duration `yaml:"-"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."` // Schedulers ring - UseSchedulerRing bool `yaml:"use_scheduler_ring"` - SchedulerRing RingConfig `yaml:"scheduler_ring,omitempty"` + UseSchedulerRing bool `yaml:"use_scheduler_ring"` + SchedulerRing lokiutil.RingConfig `yaml:"scheduler_ring,omitempty"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -107,7 +121,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.QuerierForgetDelay = 0 cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) f.BoolVar(&cfg.UseSchedulerRing, "query-scheduler.use-scheduler-ring", false, "Set to true to have the query scheduler create a ring and the frontend and frontend_worker use this ring to get the addresses of the query schedulers. If frontend_address and scheduler_address are not present in the config this value will be toggle by Loki to true") - cfg.SchedulerRing.RegisterFlags(f) + cfg.SchedulerRing.RegisterFlagsWithPrefix("query-scheduler.", "schedulers/", f) } // NewScheduler creates a new Scheduler. @@ -160,7 +174,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe if err != nil { return nil, errors.Wrap(err, "create KV store client") } - lifecyclerCfg, err := cfg.SchedulerRing.ToLifecyclerConfig() + lifecyclerCfg, err := cfg.SchedulerRing.ToLifecyclerConfig(ringNumTokens) if err != nil { return nil, errors.Wrap(err, "invalid ring lifecycler config") } @@ -172,13 +186,13 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe delegate = ring.NewTokensPersistencyDelegate(cfg.SchedulerRing.TokensFilePath, ring.JOINING, delegate, log) delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.SchedulerRing.HeartbeatTimeout, delegate, log) - s.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, RingNameForServer, RingKey, ringStore, delegate, log, registerer) + s.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, ringKey, ringStore, delegate, log, registerer) if err != nil { return nil, errors.Wrap(err, "create ring lifecycler") } - ringCfg := cfg.SchedulerRing.ToRingConfig() - s.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, RingNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) + ringCfg := cfg.SchedulerRing.ToRingConfig(ringReplicationFactor) + s.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, ringKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) if err != nil { return nil, errors.Wrap(err, "create ring client") } @@ -656,7 +670,7 @@ func (s *Scheduler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc rin } takenTokens := ringDesc.GetTokens() - newTokens := ring.GenerateTokens(RingNumTokens-len(tokens), takenTokens) + newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens) // Tokens sorting will be enforced by the parent caller. tokens = append(tokens, newTokens...) diff --git a/pkg/scheduler/scheduler_ring.go b/pkg/scheduler/scheduler_ring.go deleted file mode 100644 index 8f41a6138d46..000000000000 --- a/pkg/scheduler/scheduler_ring.go +++ /dev/null @@ -1,110 +0,0 @@ -package scheduler - -import ( - "flag" - "fmt" - "os" - "time" - - "github.com/go-kit/log/level" - "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - - "github.com/cortexproject/cortex/pkg/ring" - util_log "github.com/cortexproject/cortex/pkg/util/log" -) - -const ( - // RingKey is the key under which we store the store gateways ring in the KVStore. - RingKey = "scheduler" - - // RingNameForServer is the name of the ring used by the store gateway server. - RingNameForServer = "scheduler" - - // RingNameForClient is the name of the ring used by the store gateway client (we need - // a different name to avoid clashing Prometheus metrics when running in single-binary). - RingNameForClient = "scheduler-client" - - // We use a safe default instead of exposing to config option to the user - // in order to simplify the config. - RingNumTokens = 512 -) - -// RingConfig masks the ring lifecycler config which contains -// many options not really required by the distributors ring. This config -// is used to strip down the config to the minimum, and avoid confusion -// to the user. -type RingConfig struct { - KVStore kv.Config `yaml:"kvstore"` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` - TokensFilePath string `yaml:"tokens_file_path"` - ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` - - // Instance details - InstanceID string `yaml:"instance_id"` - InstanceInterfaceNames []string `yaml:"instance_interface_names"` - InstancePort int `yaml:"instance_port"` - InstanceAddr string `yaml:"instance_addr"` - InstanceZone string `yaml:"instance_availability_zone"` - - // Injected internally - ListenPort int `yaml:"-"` - - ObservePeriod time.Duration `yaml:"-"` -} - -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { - hostname, err := os.Hostname() - if err != nil { - level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err) - os.Exit(1) - } - - // Ring flags - cfg.KVStore.RegisterFlagsWithPrefix("scheduler.ring.", "schedulers/", f) - f.DurationVar(&cfg.HeartbeatPeriod, "scheduler.ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") - f.DurationVar(&cfg.HeartbeatTimeout, "scheduler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which schedulers are considered unhealthy within the ring. 0 = never (timeout disabled).") - f.StringVar(&cfg.TokensFilePath, "scheduler.ring.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") - f.BoolVar(&cfg.ZoneAwarenessEnabled, "scheduler.ring.zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.") - - // Instance flags - cfg.InstanceInterfaceNames = []string{"eth0", "en0"} - f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "scheduler.ring.instance-interface-names", "Name of network interface to read address from.") - f.StringVar(&cfg.InstanceAddr, "scheduler.ring.instance-addr", "", "IP address to advertise in the ring.") - f.IntVar(&cfg.InstancePort, "scheduler.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") - f.StringVar(&cfg.InstanceID, "scheduler.ring.instance-id", hostname, "Instance ID to register in the ring.") - f.StringVar(&cfg.InstanceZone, "scheduler.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") -} - -// ToLifecyclerConfig returns a LifecyclerConfig based on the scheduler ring config. -func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) { - instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames) - if err != nil { - return ring.BasicLifecyclerConfig{}, err - } - - instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) - - return ring.BasicLifecyclerConfig{ - ID: cfg.InstanceID, - Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), - Zone: cfg.InstanceZone, - HeartbeatPeriod: cfg.HeartbeatPeriod, - TokensObservePeriod: 0, - NumTokens: RingNumTokens, - }, nil -} - -func (cfg *RingConfig) ToRingConfig() ring.Config { - rc := ring.Config{} - flagext.DefaultValues(&rc) - - rc.KVStore = cfg.KVStore - rc.HeartbeatTimeout = cfg.HeartbeatTimeout - rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled - rc.ReplicationFactor = 2 - - return rc -} diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 2da08add030a..f9aa4a904f22 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -27,6 +27,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/util" ) const ( @@ -34,21 +35,34 @@ const ( // in the ring will be automatically removed. ringAutoForgetUnhealthyPeriods = 10 - // RingKeyOfLeader is a somewhat arbitrary ID to pull from the ring to see who will be elected the leader - RingKeyOfLeader = 100 + // ringKey is the key under which we store the store gateways ring in the KVStore. + ringKey = "compactor" + + // ringNameForServer is the name of the ring used by the compactor server. + ringNameForServer = "compactor" + + // ringKeyOfLeader is a somewhat arbitrary ID to pull from the ring to see who will be elected the leader + ringKeyOfLeader = 0 + + // ringReplicationFactor should be 1 because we only want to pull back one node from the Ring + ringReplicationFactor = 1 + + // ringNumTokens sets our single token in the ring, + // we only need to insert 1 token to be used for leader election purposes. + ringNumTokens = 1 ) type Config struct { - WorkingDirectory string `yaml:"working_directory"` - SharedStoreType string `yaml:"shared_store"` - SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` - CompactionInterval time.Duration `yaml:"compaction_interval"` - RetentionEnabled bool `yaml:"retention_enabled"` - RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` - RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` - DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` - MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` - CompactorRing RingConfig `yaml:"compactor_ring,omitempty"` + WorkingDirectory string `yaml:"working_directory"` + SharedStoreType string `yaml:"shared_store"` + SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + RetentionEnabled bool `yaml:"retention_enabled"` + RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` + RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` + DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` + MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` + CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"` } // RegisterFlags registers flags. @@ -62,7 +76,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") - cfg.CompactorRing.RegisterFlags(f) + cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "compactors/", f) } // Validate verifies the config does not contain inappropriate values @@ -117,7 +131,7 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st if err != nil { return nil, errors.Wrap(err, "create KV store client") } - lifecyclerCfg, err := cfg.CompactorRing.ToLifecyclerConfig() + lifecyclerCfg, err := cfg.CompactorRing.ToLifecyclerConfig(ringNumTokens) if err != nil { return nil, errors.Wrap(err, "invalid ring lifecycler config") } @@ -129,13 +143,13 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st delegate = ring.NewTokensPersistencyDelegate(cfg.CompactorRing.TokensFilePath, ring.JOINING, delegate, util_log.Logger) delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.CompactorRing.HeartbeatTimeout, delegate, util_log.Logger) - compactor.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, RingNameForServer, RingKey, ringStore, delegate, util_log.Logger, r) + compactor.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, ringKey, ringStore, delegate, util_log.Logger, r) if err != nil { return nil, errors.Wrap(err, "create ring lifecycler") } - ringCfg := cfg.CompactorRing.ToRingConfig() - compactor.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, RingNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) + ringCfg := cfg.CompactorRing.ToRingConfig(ringReplicationFactor) + compactor.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, ringKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy()) if err != nil { return nil, errors.Wrap(err, "create ring client") } @@ -278,7 +292,7 @@ func (c *Compactor) loop(ctx context.Context) error { return nil case <-syncTicker.C: bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() - rs, err := c.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones) + rs, err := c.ring.Get(ringKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones) if err != nil { level.Error(util_log.Logger).Log("msg", "error asking ring for who should run the compactor, will check again", "err", err) continue @@ -536,7 +550,7 @@ func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc rin } takenTokens := ringDesc.GetTokens() - newTokens := ring.GenerateTokens(RingNumTokens-len(tokens), takenTokens) + newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens) // Tokens sorting will be enforced by the parent caller. tokens = append(tokens, newTokens...) diff --git a/pkg/storage/stores/shipper/compactor/compactor_ring.go b/pkg/storage/stores/shipper/compactor/compactor_ring.go deleted file mode 100644 index 5f0b0cf76f3f..000000000000 --- a/pkg/storage/stores/shipper/compactor/compactor_ring.go +++ /dev/null @@ -1,107 +0,0 @@ -package compactor - -import ( - "flag" - "fmt" - "os" - "time" - - "github.com/go-kit/log/level" - "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" - - "github.com/cortexproject/cortex/pkg/ring" - util_log "github.com/cortexproject/cortex/pkg/util/log" -) - -const ( - // RingKey is the key under which we store the store gateways ring in the KVStore. - RingKey = "compactor" - - // RingNameForServer is the name of the ring used by the compactor server. - RingNameForServer = "compactor" - - - // We use a safe default instead of exposing to config option to the user - // in order to simplify the config. - RingNumTokens = 1 -) - -// RingConfig masks the ring lifecycler config which contains -// many options not really required by the distributors ring. This config -// is used to strip down the config to the minimum, and avoid confusion -// to the user. -type RingConfig struct { - KVStore kv.Config `yaml:"kvstore"` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` - TokensFilePath string `yaml:"tokens_file_path"` - ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` - - // Instance details - InstanceID string `yaml:"instance_id"` - InstanceInterfaceNames []string `yaml:"instance_interface_names"` - InstancePort int `yaml:"instance_port"` - InstanceAddr string `yaml:"instance_addr"` - InstanceZone string `yaml:"instance_availability_zone"` - - // Injected internally - ListenPort int `yaml:"-"` - - ObservePeriod time.Duration `yaml:"-"` -} - -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { - hostname, err := os.Hostname() - if err != nil { - level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err) - os.Exit(1) - } - - // Ring flags - cfg.KVStore.RegisterFlagsWithPrefix("boltdb.shipper.compactor.ring.", "compactors/", f) - f.DurationVar(&cfg.HeartbeatPeriod, "boltdb.shipper.compactor.ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") - f.DurationVar(&cfg.HeartbeatTimeout, "boltdb.shipper.compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).") - f.StringVar(&cfg.TokensFilePath, "boltdb.shipper.compactor.ring.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") - f.BoolVar(&cfg.ZoneAwarenessEnabled, "boltdb.shipper.compactor.ring.zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.") - - // Instance flags - cfg.InstanceInterfaceNames = []string{"eth0", "en0"} - f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "boltdb.shipper.compactor.ring.instance-interface-names", "Name of network interface to read address from.") - f.StringVar(&cfg.InstanceAddr, "boltdb.shipper.compactor.ring.instance-addr", "", "IP address to advertise in the ring.") - f.IntVar(&cfg.InstancePort, "boltdb.shipper.compactor.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") - f.StringVar(&cfg.InstanceID, "boltdb.shipper.compactor.ring.instance-id", hostname, "Instance ID to register in the ring.") - f.StringVar(&cfg.InstanceZone, "boltdb.shipper.compactor.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") -} - -// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor ring config. -func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) { - instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames) - if err != nil { - return ring.BasicLifecyclerConfig{}, err - } - - instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) - - return ring.BasicLifecyclerConfig{ - ID: cfg.InstanceID, - Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), - Zone: cfg.InstanceZone, - HeartbeatPeriod: cfg.HeartbeatPeriod, - TokensObservePeriod: 0, - NumTokens: RingNumTokens, - }, nil -} - -func (cfg *RingConfig) ToRingConfig() ring.Config { - rc := ring.Config{} - flagext.DefaultValues(&rc) - - rc.KVStore = cfg.KVStore - rc.HeartbeatTimeout = cfg.HeartbeatTimeout - rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled - rc.ReplicationFactor = 1 - - return rc -} diff --git a/pkg/util/ring_config.go b/pkg/util/ring_config.go new file mode 100644 index 000000000000..a0ae0e6e3e30 --- /dev/null +++ b/pkg/util/ring_config.go @@ -0,0 +1,95 @@ +package util + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/kv" + + "github.com/cortexproject/cortex/pkg/ring" + util_log "github.com/cortexproject/cortex/pkg/util/log" +) + +// RingConfig masks the ring lifecycler config which contains +// many options not really required by the distributors ring. This config +// is used to strip down the config to the minimum, and avoid confusion +// to the user. +type RingConfig struct { + KVStore kv.Config `yaml:"kvstore"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + TokensFilePath string `yaml:"tokens_file_path"` + ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` + + // Instance details + InstanceID string `yaml:"instance_id"` + InstanceInterfaceNames []string `yaml:"instance_interface_names"` + InstancePort int `yaml:"instance_port"` + InstanceAddr string `yaml:"instance_addr"` + InstanceZone string `yaml:"instance_availability_zone"` + + // Injected internally + ListenPort int `yaml:"-"` + + ObservePeriod time.Duration `yaml:"-"` +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +// storePrefix is used to set the path in the KVStore and should end with a / +func (cfg *RingConfig) RegisterFlagsWithPrefix(flagsPrefix, storePrefix string, f *flag.FlagSet) { + hostname, err := os.Hostname() + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err) + os.Exit(1) + } + + // Ring flags + cfg.KVStore.RegisterFlagsWithPrefix(flagsPrefix+"ring.", storePrefix, f) + f.DurationVar(&cfg.HeartbeatPeriod, flagsPrefix+"ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.") + f.DurationVar(&cfg.HeartbeatTimeout, flagsPrefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).") + f.StringVar(&cfg.TokensFilePath, flagsPrefix+"ring.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") + f.BoolVar(&cfg.ZoneAwarenessEnabled, flagsPrefix+"ring.zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.") + + // Instance flags + cfg.InstanceInterfaceNames = []string{"eth0", "en0"} + f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), flagsPrefix+"ring.instance-interface-names", "Name of network interface to read address from.") + f.StringVar(&cfg.InstanceAddr, flagsPrefix+"ring.instance-addr", "", "IP address to advertise in the ring.") + f.IntVar(&cfg.InstancePort, flagsPrefix+"ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") + f.StringVar(&cfg.InstanceID, flagsPrefix+"ring.instance-id", hostname, "Instance ID to register in the ring.") + f.StringVar(&cfg.InstanceZone, flagsPrefix+"ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") +} + +// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor ring config. +func (cfg *RingConfig) ToLifecyclerConfig(numTokens int) (ring.BasicLifecyclerConfig, error) { + instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames) + if err != nil { + return ring.BasicLifecyclerConfig{}, err + } + + instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) + + return ring.BasicLifecyclerConfig{ + ID: cfg.InstanceID, + Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + Zone: cfg.InstanceZone, + HeartbeatPeriod: cfg.HeartbeatPeriod, + TokensObservePeriod: 0, + NumTokens: numTokens, + }, nil +} + +func (cfg *RingConfig) ToRingConfig(replicationFactor int) ring.Config { + rc := ring.Config{} + flagext.DefaultValues(&rc) + + rc.KVStore = cfg.KVStore + rc.HeartbeatTimeout = cfg.HeartbeatTimeout + rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled + rc.ReplicationFactor = replicationFactor + + return rc +} From 8f770330c46e46e2a80c453a6af735f3ef56e4f2 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 31 Oct 2021 19:58:14 -0400 Subject: [PATCH 10/14] fix lint --- pkg/storage/stores/shipper/compactor/compactor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index f9aa4a904f22..81f57ca06b20 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -378,7 +378,6 @@ func (c *Compactor) runCompactions(ctx context.Context) { }() } level.Info(util_log.Logger).Log("msg", "compactor started") - return } func (c *Compactor) stopping(_ error) error { From db074eba2d80d42be7977c5861ebe088c79e7fd9 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 31 Oct 2021 21:20:16 -0400 Subject: [PATCH 11/14] better handling of the tokens file with a way to easily tell all the rings to persist tokens --- docs/sources/configuration/_index.md | 4 ++ pkg/loki/common/common.go | 5 +- pkg/loki/config_wrapper.go | 40 +++++++++++-- pkg/loki/config_wrapper_test.go | 85 +++++++++++++++++++++++++++- 4 files changed, 126 insertions(+), 8 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 3a5b9f6e0efc..b4a9d219cfd2 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2410,6 +2410,10 @@ This way, one doesn't have to replicate configs in multiple places. # When defined, the given prefix will be present in front of the endpoint paths. [path_prefix: ] + +# When true, the ingester, compactor and query_scheduler ring tokens will be saved to files in the path_prefix directory +# Loki will error if you set this to true and path_prefix is empty. +[persist_tokens: : default = false] ``` ### common_storage_config diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index d85d01d0cbda..f791cac2fe24 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -12,8 +12,9 @@ import ( // Config holds common config that can be shared between multiple other config sections type Config struct { - PathPrefix string `yaml:"path_prefix"` - Storage Storage `yaml:"storage"` + PathPrefix string `yaml:"path_prefix"` + Storage Storage `yaml:"storage"` + PersistTokens bool `yaml:"persist_tokens"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 78671ec54eae..c6519a59d79d 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -77,7 +77,9 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { } applyPathPrefixDefaults(r, defaults) - applyIngesterRingConfig(r, &defaults) + if err := applyIngesterRingConfig(r, &defaults); err != nil { + return err + } applyMemberlistConfig(r) if err := applyStorageConfig(r, &defaults); err != nil { @@ -101,7 +103,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { // // If the ingester ring has its interface names sets to a value equal to the default (["eth0", en0"]), it will try to append // the loopback interface at the end of it. -func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) { +func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error { if reflect.DeepEqual(r.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) { appendLoopbackInterface(r) } @@ -111,6 +113,12 @@ func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) { s := rc.KVStore.Store sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig + f, err := tokensFile(r, "ingester.tokens") + if err != nil { + return err + } + r.Ingester.LifecyclerConfig.TokensFilePath = f + // This gets ugly because we use a separate struct for configuring each ring... // Distributor @@ -143,9 +151,13 @@ func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) { r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = lc.InfNames r.QueryScheduler.SchedulerRing.InstanceZone = lc.Zone r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.QueryScheduler.SchedulerRing.TokensFilePath = lc.TokensFilePath r.QueryScheduler.SchedulerRing.KVStore.Store = s r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc + f, err = tokensFile(r, "scheduler.tokens") + if err != nil { + return err + } + r.QueryScheduler.SchedulerRing.TokensFilePath = f // Compactor r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout @@ -156,9 +168,29 @@ func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) { r.CompactorConfig.CompactorRing.InstanceInterfaceNames = lc.InfNames r.CompactorConfig.CompactorRing.InstanceZone = lc.Zone r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.CompactorConfig.CompactorRing.TokensFilePath = lc.TokensFilePath r.CompactorConfig.CompactorRing.KVStore.Store = s r.CompactorConfig.CompactorRing.KVStore.StoreConfig = sc + f, err = tokensFile(r, "compactor.tokens") + if err != nil { + return err + } + r.CompactorConfig.CompactorRing.TokensFilePath = f + return nil +} + +// tokensFile will create a tokens file with the provided name in the common config /tokens directory +// if and only if: +// * the common config persist_tokens == true +// * the common config path_prefix is defined. +func tokensFile(cfg *ConfigWrapper, file string) (string, error) { + if !cfg.Common.PersistTokens { + return "", nil + } + if cfg.Common.PathPrefix == "" { + return "", errors.New("if persist_tokens is true, path_prefix MUST be defined") + } + + return cfg.Common.PathPrefix + "/" + file, nil } func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) { diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 2dad9c47d5c4..0a103a91c5e3 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -20,7 +20,9 @@ import ( cortex_local "github.com/cortexproject/cortex/pkg/ruler/rulestore/local" cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift" + "github.com/grafana/loki/pkg/loki/common" "github.com/grafana/loki/pkg/storage/chunk/storage" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/cfg" loki_net "github.com/grafana/loki/pkg/util/net" ) @@ -910,9 +912,62 @@ func TestDefaultUnmarshal(t *testing.T) { func Test_applyIngesterRingConfig(t *testing.T) { - msgf := "%s has changed, this is a crude attempt to catch mapping errors missed in config_wrapper.applyIngesterRingConfig when a ring config changes. Please add a new mapping and update the expected value in this test." + t.Run("Attempt to catch changes to a RingConfig", func(t *testing.T) { + msgf := "%s has changed, this is a crude attempt to catch mapping errors missed in config_wrapper.applyIngesterRingConfig when a ring config changes. Please add a new mapping and update the expected value in this test." - assert.Equal(t, 8, reflect.TypeOf(distributor.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String())) + assert.Equal(t, 8, reflect.TypeOf(distributor.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String())) + assert.Equal(t, 12, reflect.TypeOf(util.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(util.RingConfig{}).String())) + }) + + t.Run("compactor and scheduler tokens file should not be configured if persist_tokens is false", func(t *testing.T) { + yamlContent := ` +common: + path_prefix: /loki +` + config, _, err := configWrapperFromYAML(t, yamlContent, []string{}) + assert.NoError(t, err) + + assert.Equal(t, "", config.Ingester.LifecyclerConfig.TokensFilePath) + assert.Equal(t, "", config.CompactorConfig.CompactorRing.TokensFilePath) + assert.Equal(t, "", config.QueryScheduler.SchedulerRing.TokensFilePath) + }) + + t.Run("tokens files should be set from common config when persist_tokens is true and path_prefix is defined", func(t *testing.T) { + yamlContent := ` +common: + persist_tokens: true + path_prefix: /loki +` + config, _, err := configWrapperFromYAML(t, yamlContent, []string{}) + assert.NoError(t, err) + + assert.Equal(t, "/loki/ingester.tokens", config.Ingester.LifecyclerConfig.TokensFilePath) + assert.Equal(t, "/loki/compactor.tokens", config.CompactorConfig.CompactorRing.TokensFilePath) + assert.Equal(t, "/loki/scheduler.tokens", config.QueryScheduler.SchedulerRing.TokensFilePath) + }) + + t.Run("common config ignored if actual values set", func(t *testing.T) { + yamlContent := ` +ingester: + lifecycler: + tokens_file_path: /loki/toookens +compactor: + compactor_ring: + tokens_file_path: /foo/tokens +query_scheduler: + scheduler_ring: + tokens_file_path: /sched/tokes +common: + persist_tokens: true + path_prefix: /loki +` + config, _, err := configWrapperFromYAML(t, yamlContent, []string{}) + assert.NoError(t, err) + + assert.Equal(t, "/loki/toookens", config.Ingester.LifecyclerConfig.TokensFilePath) + assert.Equal(t, "/foo/tokens", config.CompactorConfig.CompactorRing.TokensFilePath) + assert.Equal(t, "/sched/tokes", config.QueryScheduler.SchedulerRing.TokensFilePath) + }) } @@ -1015,3 +1070,29 @@ func TestLoopbackAppendingToFrontendV2(t *testing.T) { assert.Equal(t, []string{"otheriface"}, config.Frontend.FrontendV2.InfNames) }) } + +func Test_tokensFile(t *testing.T) { + tests := []struct { + name string + cfg *ConfigWrapper + file string + want string + wantErr bool + }{ + {"persist_tokens false, path_prefix empty", &ConfigWrapper{Config: Config{Common: common.Config{PathPrefix: "", PersistTokens: false}}}, "ingester.tokens", "", false}, + {"persist_tokens true, path_prefix empty", &ConfigWrapper{Config: Config{Common: common.Config{PathPrefix: "", PersistTokens: true}}}, "ingester.tokens", "", true}, + {"persist_tokens true, path_prefix set", &ConfigWrapper{Config: Config{Common: common.Config{PathPrefix: "/loki", PersistTokens: true}}}, "ingester.tokens", "/loki/ingester.tokens", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tokensFile(tt.cfg, tt.file) + if (err != nil) != tt.wantErr { + t.Errorf("tokensFile() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("tokensFile() got = %v, want %v", got, tt.want) + } + }) + } +} From 6b7111ec5019c49f5b75a086fd687e1744400e13 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 1 Nov 2021 09:21:02 -0400 Subject: [PATCH 12/14] fix query scheduler replication factor --- pkg/scheduler/scheduler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 75999fdd5567..7f05131204a8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -49,8 +49,8 @@ const ( // ringNameForServer is the name of the ring used by the compactor server. ringNameForServer = "scheduler" - // ringReplicationFactor should be 1 because we only want to pull back one node from the Ring - ringReplicationFactor = 1 + // ringReplicationFactor should be 2 because we want 2 schedulers. + ringReplicationFactor = 2 // ringNumTokens sets our single token in the ring, // we only need to insert 1 token to be used for leader election purposes. From dbf196ba1b69638220d819bae8c5da3dce6fc32c Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 1 Nov 2021 10:46:32 -0400 Subject: [PATCH 13/14] removing "scheduler" from log line which may be called by components other than a scheduler in the future. --- pkg/util/ring_watcher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/util/ring_watcher.go b/pkg/util/ring_watcher.go index ee9779ab6ad1..918239243f18 100644 --- a/pkg/util/ring_watcher.go +++ b/pkg/util/ring_watcher.go @@ -86,12 +86,12 @@ func (w *ringWatcher) lookupAddresses() { } for _, ta := range toAdd { - level.Debug(w.log).Log("msg", fmt.Sprintf("adding connection to scheduler at address: %s", ta)) + level.Debug(w.log).Log("msg", fmt.Sprintf("adding connection to address: %s", ta)) w.notifications.AddressAdded(ta) } for _, tr := range toRemove { - level.Debug(w.log).Log("msg", fmt.Sprintf("removing connection to scheduler at address: %s", tr)) + level.Debug(w.log).Log("msg", fmt.Sprintf("removing connection to address: %s", tr)) w.notifications.AddressRemoved(tr) } From 584a47cb5ee4caa0d7b1e1fe3264e5a6afcca5c4 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 1 Nov 2021 10:48:27 -0400 Subject: [PATCH 14/14] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce185695c864..f2c1eb712f07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * [4543](https://github.com/grafana/loki/pull/4543) **trevorwhitney**: Change more default values and improve application of common storage config * [4570](https://github.com/grafana/loki/pull/4570) **DylanGuedes**: Loki: Append loopback to ingester net interface default list * [4594](https://github.com/grafana/loki/pull/4594) **owen-d**: Configures unordered_writes=true by default +* [4574](https://github.com/grafana/loki/pull/4574) **slim-bean**: Loki: Add a ring to the compactor used to control concurrency when not running standalone # 2.3.0 (2021/08/06)