Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: Add a ring to the compactor used to control concurrency when not running standalone #4574

Merged
merged 14 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [4543](https://github.com/grafana/loki/pull/4543) **trevorwhitney**: Change more default values and improve application of common storage config
* [4570](https://github.com/grafana/loki/pull/4570) **DylanGuedes**: Loki: Append loopback to ingester net interface default list
* [4594](https://github.com/grafana/loki/pull/4594) **owen-d**: Configures unordered_writes=true by default
* [4574](https://github.com/grafana/loki/pull/4574) **slim-bean**: Loki: Add a ring to the compactor used to control concurrency when not running standalone

# 2.3.0 (2021/08/06)

Expand Down
97 changes: 97 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1903,33 +1903,126 @@ compacts index shards to more performant forms.

```yaml
# Directory where files can be downloaded for compaction.
# CLI flag: -boltdb.shipper.compactor.working-directory
[working_directory: <string>]

# The shared store used for storing boltdb files.
# Supported types: gcs, s3, azure, swift, filesystem.
# CLI flag: -boltdb.shipper.compactor.shared-store
[shared_store: <string>]

# Prefix to add to object keys in shared store.
# Path separator(if any) should always be a '/'.
# Prefix should never start with a separator but should always end with it.
# CLI flag: -boltdb.shipper.compactor.shared-store.key-prefix
[shared_store_key_prefix: <string> | default = "index/"]

# Interval at which to re-run the compaction operation (or retention if enabled).
# CLI flag: -boltdb.shipper.compactor.compaction-interval
[compaction_interval: <duration> | default = 10m]

# (Experimental) Activate custom (per-stream,per-tenant) retention.
# CLI flag: -boltdb.shipper.compactor.retention-enabled
[retention_enabled: <bool> | default = false]

# Delay after which chunks will be fully deleted during retention.
# CLI flag: -boltdb.shipper.compactor.retention-delete-delay
[retention_delete_delay: <duration> | default = 2h]

# The total amount of worker to use to delete chunks.
# CLI flag: -boltdb.shipper.compactor.retention-delete-worker-count
[retention_delete_worker_count: <int> | default = 150]

# Allow cancellation of delete request until duration after they are created.
# Data would be deleted only after delete requests have been older than this duration.
# Ideally this should be set to at least 24h.
# CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period
[delete_request_cancel_period: <duration> | default = 24h]

# Maximum number of tables to compact in parallel.
# While increasing this value, please make sure compactor has enough disk space
# allocated to be able to store and compact as many tables.
# CLI flag: -boltdb.shipper.compactor.max-compaction-parallelism
[max_compaction_parallelism: <int> | default = 1]

# The hash ring configuration used by compactors to elect a single instance for running compactions
compactor_ring:
# The key-value store used to share the hash ring across multiple instances.
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
# CLI flag: -boltdb.shipper.compactor.ring.store
[store: <string> | default = "memberlist"]

# The prefix for the keys in the store. Should end with a /.
# CLI flag: -boltdb.shipper.compactor.ring.prefix
[prefix: <string> | default = "compactors/"]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring
[consul: <consul_config>]

# The etcd_config configures the etcd client.
# The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring
[etcd: <etcd_config>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -boltdb.shipper.compactor.ring.multi.primary
[primary: <string> | default = ""]

# Secondary backend storage used by multi-client.
# CLI flag: -boltdb.shipper.compactor.ring.multi.secondary
[secondary: <string> | default = ""]

# Mirror writes to secondary store.
# CLI flag: -boltdb.shipper.compactor.ring.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]

# Timeout for storing value to secondary store.
# CLI flag: -boltdb.shipper.compactor.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]

# Interval between heartbeats sent to the ring. 0 = disabled.
# CLI flag: -boltdb.shipper.compactor.ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]

# The heartbeat timeout after which store gateways are considered unhealthy
# within the ring. 0 = never (timeout disabled). This option needs be set both
# on the store-gateway and querier when running in microservices mode.
# CLI flag: -boltdb.shipper.compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# File path where tokens are stored. If empty, tokens are neither stored at
# shutdown nor restored at startup.
# CLI flag: -boltdb.shipper.compactor.ring.tokens-file-path
[tokens_file_path: <string> | default = ""]

# True to enable zone-awareness and replicate blocks across different
# availability zones.
# CLI flag: -boltdb.shipper.compactor.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]

# Name of network interface to read addresses from.
# CLI flag: -boltdb.shipper.compactor.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]

# IP address to advertise in the ring.
# CLI flag: -boltdb.shipper.compactor.ring.instance-addr
[instance_addr: <list of string> | default = first from instance_interface_names]

# Port to advertise in the ring
# CLI flag: -boltdb.shipper.compactor.ring.instance-port
[instance_port: <list of string> | default = server.grpc-listen-port]

# Instance ID to register in the ring.
# CLI flag: -boltdb.shipper.compactor.ring.instance-id
[instance_id: <list of string> | default = os.Hostname()]

# The availability zone where this instance is running. Required if
# zone-awareness is enabled.
# CLI flag: -boltdb.shipper.compactor.ring.instance-availability-zone
[instance_availability_zone: <string> | default = ""]
```

## limits_config
Expand Down Expand Up @@ -2317,6 +2410,10 @@ This way, one doesn't have to replicate configs in multiple places.

# When defined, the given prefix will be present in front of the endpoint paths.
[path_prefix: <string>]

# When true, the ingester, compactor and query_scheduler ring tokens will be saved to files in the path_prefix directory
# Loki will error if you set this to true and path_prefix is empty.
[persist_tokens: <boolean>: default = false]
```

### common_storage_config
Expand Down
5 changes: 3 additions & 2 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (

// Config holds common config that can be shared between multiple other config sections
type Config struct {
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
PersistTokens bool `yaml:"persist_tokens"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down
52 changes: 49 additions & 3 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}

applyPathPrefixDefaults(r, defaults)
applyIngesterRingConfig(r, &defaults)
if err := applyIngesterRingConfig(r, &defaults); err != nil {
return err
}
applyMemberlistConfig(r)

if err := applyStorageConfig(r, &defaults); err != nil {
Expand All @@ -101,7 +103,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
//
// If the ingester ring has its interface names sets to a value equal to the default (["eth0", en0"]), it will try to append
// the loopback interface at the end of it.
func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) {
func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) error {
if reflect.DeepEqual(r.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
appendLoopbackInterface(r)
}
Expand All @@ -111,6 +113,12 @@ func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) {
s := rc.KVStore.Store
sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig

f, err := tokensFile(r, "ingester.tokens")
if err != nil {
return err
}
r.Ingester.LifecyclerConfig.TokensFilePath = f

// This gets ugly because we use a separate struct for configuring each ring...

// Distributor
Expand Down Expand Up @@ -143,9 +151,46 @@ func applyIngesterRingConfig(r *ConfigWrapper, defaults *ConfigWrapper) {
r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = lc.InfNames
r.QueryScheduler.SchedulerRing.InstanceZone = lc.Zone
r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.QueryScheduler.SchedulerRing.TokensFilePath = lc.TokensFilePath
r.QueryScheduler.SchedulerRing.KVStore.Store = s
r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc
f, err = tokensFile(r, "scheduler.tokens")
if err != nil {
return err
}
r.QueryScheduler.SchedulerRing.TokensFilePath = f

// Compactor
slim-bean marked this conversation as resolved.
Show resolved Hide resolved
r.CompactorConfig.CompactorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.CompactorConfig.CompactorRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.CompactorConfig.CompactorRing.InstancePort = lc.Port
r.CompactorConfig.CompactorRing.InstanceAddr = lc.Addr
r.CompactorConfig.CompactorRing.InstanceID = lc.ID
r.CompactorConfig.CompactorRing.InstanceInterfaceNames = lc.InfNames
r.CompactorConfig.CompactorRing.InstanceZone = lc.Zone
r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.CompactorConfig.CompactorRing.KVStore.Store = s
r.CompactorConfig.CompactorRing.KVStore.StoreConfig = sc
f, err = tokensFile(r, "compactor.tokens")
if err != nil {
return err
}
r.CompactorConfig.CompactorRing.TokensFilePath = f
return nil
}

// tokensFile will create a tokens file with the provided name in the common config /tokens directory
// if and only if:
// * the common config persist_tokens == true
// * the common config path_prefix is defined.
func tokensFile(cfg *ConfigWrapper, file string) (string, error) {
if !cfg.Common.PersistTokens {
return "", nil
}
if cfg.Common.PathPrefix == "" {
return "", errors.New("if persist_tokens is true, path_prefix MUST be defined")
}

return cfg.Common.PathPrefix + "/" + file, nil
}

func applyPathPrefixDefaults(r *ConfigWrapper, defaults ConfigWrapper) {
Expand Down Expand Up @@ -184,6 +229,7 @@ func applyMemberlistConfig(r *ConfigWrapper) {
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
r.Ruler.Ring.KVStore.Store = memberlistStr
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
}
}

Expand Down
85 changes: 83 additions & 2 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
cortex_local "github.com/cortexproject/cortex/pkg/ruler/rulestore/local"
cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift"

"github.com/grafana/loki/pkg/loki/common"
"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/cfg"
loki_net "github.com/grafana/loki/pkg/util/net"
)
Expand Down Expand Up @@ -910,9 +912,62 @@ func TestDefaultUnmarshal(t *testing.T) {

func Test_applyIngesterRingConfig(t *testing.T) {

msgf := "%s has changed, this is a crude attempt to catch mapping errors missed in config_wrapper.applyIngesterRingConfig when a ring config changes. Please add a new mapping and update the expected value in this test."
t.Run("Attempt to catch changes to a RingConfig", func(t *testing.T) {
msgf := "%s has changed, this is a crude attempt to catch mapping errors missed in config_wrapper.applyIngesterRingConfig when a ring config changes. Please add a new mapping and update the expected value in this test."

assert.Equal(t, 8, reflect.TypeOf(distributor.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String()))
assert.Equal(t, 8, reflect.TypeOf(distributor.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String()))
assert.Equal(t, 12, reflect.TypeOf(util.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(util.RingConfig{}).String()))
})

t.Run("compactor and scheduler tokens file should not be configured if persist_tokens is false", func(t *testing.T) {
yamlContent := `
common:
path_prefix: /loki
`
config, _, err := configWrapperFromYAML(t, yamlContent, []string{})
assert.NoError(t, err)

assert.Equal(t, "", config.Ingester.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "", config.CompactorConfig.CompactorRing.TokensFilePath)
assert.Equal(t, "", config.QueryScheduler.SchedulerRing.TokensFilePath)
})

t.Run("tokens files should be set from common config when persist_tokens is true and path_prefix is defined", func(t *testing.T) {
yamlContent := `
common:
persist_tokens: true
path_prefix: /loki
`
config, _, err := configWrapperFromYAML(t, yamlContent, []string{})
assert.NoError(t, err)

assert.Equal(t, "/loki/ingester.tokens", config.Ingester.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "/loki/compactor.tokens", config.CompactorConfig.CompactorRing.TokensFilePath)
assert.Equal(t, "/loki/scheduler.tokens", config.QueryScheduler.SchedulerRing.TokensFilePath)
})

t.Run("common config ignored if actual values set", func(t *testing.T) {
yamlContent := `
ingester:
lifecycler:
tokens_file_path: /loki/toookens
compactor:
compactor_ring:
tokens_file_path: /foo/tokens
query_scheduler:
scheduler_ring:
tokens_file_path: /sched/tokes
common:
persist_tokens: true
path_prefix: /loki
`
config, _, err := configWrapperFromYAML(t, yamlContent, []string{})
assert.NoError(t, err)

assert.Equal(t, "/loki/toookens", config.Ingester.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "/foo/tokens", config.CompactorConfig.CompactorRing.TokensFilePath)
assert.Equal(t, "/sched/tokes", config.QueryScheduler.SchedulerRing.TokensFilePath)
})

}

Expand Down Expand Up @@ -1015,3 +1070,29 @@ func TestLoopbackAppendingToFrontendV2(t *testing.T) {
assert.Equal(t, []string{"otheriface"}, config.Frontend.FrontendV2.InfNames)
})
}

func Test_tokensFile(t *testing.T) {
tests := []struct {
name string
cfg *ConfigWrapper
file string
want string
wantErr bool
}{
{"persist_tokens false, path_prefix empty", &ConfigWrapper{Config: Config{Common: common.Config{PathPrefix: "", PersistTokens: false}}}, "ingester.tokens", "", false},
{"persist_tokens true, path_prefix empty", &ConfigWrapper{Config: Config{Common: common.Config{PathPrefix: "", PersistTokens: true}}}, "ingester.tokens", "", true},
{"persist_tokens true, path_prefix set", &ConfigWrapper{Config: Config{Common: common.Config{PathPrefix: "/loki", PersistTokens: true}}}, "ingester.tokens", "/loki/ingester.tokens", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tokensFile(tt.cfg, tt.file)
if (err != nil) != tt.wantErr {
t.Errorf("tokensFile() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("tokensFile() got = %v, want %v", got, tt.want)
}
})
}
}
10 changes: 2 additions & 8 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ func (t *Loki) setupModuleManager() error {
Compactor: {Server, Overrides},
IndexGateway: {Server},
IngesterQuerier: {Ring},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler},
Read: {QueryScheduler, QueryFrontend, Querier, Ruler},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor},
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasons for putting the compactor on the read path:

  • having consistent performance on all write nodes is more critical than read nodes
  • the compactor is a read path optimization and is not critical to writes

Write: {Ingester, Distributor},
}

Expand All @@ -463,12 +463,6 @@ func (t *Loki) setupModuleManager() error {
deps[Store] = append(deps[Store], IngesterQuerier)
}

// If we are running Loki with boltdb-shipper as a single binary, without clustered mode(which should always be the case when using inmemory ring),
// we should start compactor as well for better user experience.
if storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Store == "inmemory" {
deps[All] = append(deps[All], Compactor)
}

// If the query scheduler and querier are running together, make sure the scheduler goes
// first to initialize the ring that will also be used by the querier
if (t.Cfg.isModuleEnabled(Querier) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(All) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,15 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
}

func (t *Loki) initCompactor() (services.Service, error) {
// Set some config sections from other config sections in the config struct
t.Cfg.CompactorConfig.CompactorRing.ListenPort = t.Cfg.Server.GRPCListenPort
slim-bean marked this conversation as resolved.
Show resolved Hide resolved
t.Cfg.CompactorConfig.CompactorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV

if !loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
level.Info(util_log.Logger).Log("msg", "Not using boltdb-shipper index, not starting compactor")
return nil, nil
}

err := t.Cfg.SchemaConfig.Load()
if err != nil {
return nil, err
Expand All @@ -661,6 +670,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
return nil, err
}

t.Server.HTTP.Handle("/compactor/ring", t.compactor)
if t.Cfg.CompactorConfig.RetentionEnabled {
t.Server.HTTP.Path("/loki/api/admin/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/admin/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
Expand Down
Loading