Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(upgrade): ensure query config written by influxd upgrade is valid #21321

Merged
merged 5 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
1. [19811](https://github.com/influxdata/influxdb/pull/19811): Add Geo graph type to be able to store in Dashboard cells.
1. [21218](https://github.com/influxdata/influxdb/pull/21218): Add the properties of a static legend for line graphs and band plots.

### Bug Fixes

1. [21321](https://github.com/influxdata/influxdb/pull/21321): Ensure query config written by influxd upgrade is valid.

## v2.0.5 [2021-04-27]

### Windows Support
Expand Down
12 changes: 6 additions & 6 deletions cmd/influxd/launcher/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ const (
// NewInfluxdCommand constructs the root of the influxd CLI, along with a `run` subcommand.
// The `run` subcommand is set as the default to execute.
func NewInfluxdCommand(ctx context.Context, v *viper.Viper) (*cobra.Command, error) {
o := newOpts(v)
cliOpts := o.bindCliOpts()
o := NewOpts(v)
cliOpts := o.BindCliOpts()

prog := cli.Program{
Name: "influxd",
Expand Down Expand Up @@ -168,8 +168,8 @@ type InfluxdOpts struct {
Viper *viper.Viper
}

// newOpts constructs options with default values.
func newOpts(viper *viper.Viper) *InfluxdOpts {
// NewOpts constructs options with default values.
func NewOpts(viper *viper.Viper) *InfluxdOpts {
dir, err := fs.InfluxDir()
if err != nil {
panic(fmt.Errorf("failed to determine influx directory: %v", err))
Expand Down Expand Up @@ -216,9 +216,9 @@ func newOpts(viper *viper.Viper) *InfluxdOpts {
}
}

// bindCliOpts returns a list of options which can be added to a cobra command
// BindCliOpts returns a list of options which can be added to a cobra command
// in order to set options over the CLI.
func (o *InfluxdOpts) bindCliOpts() []cli.Opt {
func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
return []cli.Opt{
{
DestP: &o.LogLevel,
Expand Down
35 changes: 19 additions & 16 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,8 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
onboardSvc = tenant.NewOnboardingLogger(onboardingLogger, onboardSvc) // with logging

var (
authorizerV1 platform.AuthorizerV1
passwordV1 platform.PasswordsService
authSvcV1 *authv1.Service
passwordV1 platform.PasswordsService
authSvcV1 *authv1.Service
)
{
authStore, err := authv1.NewStore(m.kvStore)
Expand All @@ -686,13 +685,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {

authSvcV1 = authv1.NewService(authStore, ts)
passwordV1 = authv1.NewCachingPasswordsService(authSvcV1)

authorizerV1 = &authv1.Authorizer{
AuthV1: authSvcV1,
AuthV2: authSvc,
Comparer: passwordV1,
User: ts,
}
}

var (
Expand Down Expand Up @@ -736,12 +728,19 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
BucketFinder: ts.BucketService,
LogBucketName: platform.MonitoringSystemBucketName,
},
DeleteService: deleteService,
BackupService: backupService,
RestoreService: restoreService,
AuthorizationService: authSvc,
AuthorizerV1: authorizerV1,
AlgoWProxy: &http.NoopProxyHandler{},
DeleteService: deleteService,
BackupService: backupService,
RestoreService: restoreService,
AuthorizationService: authSvc,
AuthorizationV1Service: authSvcV1,
PasswordV1Service: passwordV1,
AuthorizerV1: &authv1.Authorizer{
AuthV1: authSvcV1,
AuthV2: authSvc,
Comparer: passwordV1,
User: ts,
},
AlgoWProxy: &http.NoopProxyHandler{},
// Wrap the BucketService in a storage backed one that will ensure deleted buckets are removed from the storage engine.
BucketService: ts.BucketService,
SessionService: sessionSvc,
Expand Down Expand Up @@ -1140,6 +1139,10 @@ func (m *Launcher) AuthorizationService() platform.AuthorizationService {
return m.apibackend.AuthorizationService
}

func (m *Launcher) AuthorizationV1Service() platform.AuthorizationService {
return m.apibackend.AuthorizationV1Service
}

// SecretService returns the internal secret service.
func (m *Launcher) SecretService() platform.SecretService {
return m.apibackend.SecretService
Expand Down
2 changes: 1 addition & 1 deletion cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (tl *TestLauncher) RunOrFail(tb testing.TB, ctx context.Context, setters ..
// Run executes the program with additional arguments to set paths and ports.
// Passed arguments will overwrite/add to the default ones.
func (tl *TestLauncher) Run(tb zaptest.TestingT, ctx context.Context, setters ...OptSetter) error {
opts := newOpts(viper.New())
opts := NewOpts(viper.New())
if !tl.realServer {
opts.StoreType = "memory"
opts.Testing = true
Expand Down
92 changes: 50 additions & 42 deletions cmd/influxd/upgrade/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
"golang.org/x/text/transform"
)

// configMapRules is a map of transformation rules
var configMapRules = map[string]string{
// passthroughConfigRules maps v1 config key-names to corresponding v2 config key-names.
// The values of these configs will not be modified during the upgrade process.
var passthroughConfigRules = map[string]string{
"reporting-disabled": "reporting-disabled",
"data.dir": "engine-path",
"data.wal-fsync-delay": "storage-wal-fsync-delay",
Expand All @@ -43,31 +44,6 @@ var configMapRules = map[string]string{
"http.bind-address": "http-bind-address",
"http.https-certificate": "tls-cert",
"http.https-private-key": "tls-key",
"http.pprof-enabled": "pprof-disabled",
}

// configValueTransforms is a map from 2.x config keys to transformation functions
// that should run on the 1.x values before they're written into the 2.x config.
var configValueTransforms = map[string]func(interface{}) interface{}{
// Transform config values of 0 into 10 (the new default).
// query-concurrency used to accept 0 as a representation of infinity,
// but the 2.x controller now forces a positive value to be chosen
// for the parameter.
"query-concurrency": func(v interface{}) interface{} {
ret := v
if i, ok := v.(int64); ok && i == 0 {
ret = 10
}
return ret
},
// Flip the boolean (1.x tracked 'enabled', 2.x tracks 'disabled').
"pprof-disabled": func(v interface{}) interface{} {
ret := v
if b, ok := v.(bool); ok {
ret = !b
}
return ret
},
}

func loadV1Config(configFile string) (*configV1, *map[string]interface{}, error) {
Expand Down Expand Up @@ -120,9 +96,8 @@ func load(path string) ([]byte, error) {
func upgradeConfig(v1Config map[string]interface{}, targetOptions optionsV2, log *zap.Logger) error {
// create and initialize helper
cu := &configUpgrader{
rules: configMapRules,
valueTransforms: configValueTransforms,
log: log,
rules: passthroughConfigRules,
log: log,
}

// rewrite config options from V1 to V2 paths
Expand All @@ -137,9 +112,8 @@ func upgradeConfig(v1Config map[string]interface{}, targetOptions optionsV2, log

// configUpgrader is a helper used by `upgrade-config` command.
type configUpgrader struct {
rules map[string]string
valueTransforms map[string]func(interface{}) interface{}
log *zap.Logger
rules map[string]string
log *zap.Logger
}

func (cu *configUpgrader) updateV2Config(config map[string]interface{}, targetOptions optionsV2) {
Expand Down Expand Up @@ -168,36 +142,70 @@ func (cu *configUpgrader) save(config map[string]interface{}, path string) error

// Credits: @rogpeppe (Roger Peppe)

func (cu *configUpgrader) transform(x map[string]interface{}) map[string]interface{} {
func (cu *configUpgrader) transform(v1Config map[string]interface{}) map[string]interface{} {
res := make(map[string]interface{})
for old, new := range cu.rules {
val, ok := cu.lookup(x, old)
if ok {
if transform, ok := cu.valueTransforms[new]; ok {
val = transform(val)
}
if val, ok := cu.lookup(v1Config, old); ok {
res[new] = val
}
}

// Special case: flip the value for pprof.
if val, ok := cu.lookup(v1Config, "http.pprof-enabled"); ok {
if b, ok := val.(bool); ok {
res["pprof-disabled"] = !b
}
}

// Special case: ensure query settings are valid.
fixQueryLimits(res)

return res
}

func (cu *configUpgrader) lookup(x map[string]interface{}, path string) (interface{}, bool) {
// fixQueryLimits ensures that all query-related config settings are compatible
// with the upgraded value of the 'query-concurrency' setting.
func fixQueryLimits(v2Config map[string]interface{}) {
concurrencyVal, ok := v2Config["query-concurrency"]
if !ok {
return
}
var concurrency int64
switch c := concurrencyVal.(type) {
case int:
concurrency = int64(c)
case int32:
concurrency = int64(c)
case int64:
concurrency = c
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these cases are paranoia on my part; I'm not sure when the config-loader would pick each type of int.

default:
concurrency = 0
}
if concurrency == 0 {
// The upgrade process doesn't generate a value for query-queue-size, so if
// query-concurrency is 0 / unset then it's safe to leave query-queue-size unset.
return
}

// When query-concurrency is > 0, query-queue-size must also be > 0.
v2Config["query-queue-size"] = concurrency
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a risk here that:

  • Somebody will pick a huge value for query-concurrency
  • We write the same value into query-queue-size
  • At startup, the server uses the huge value for query-queue-size as the size of a new chan, consuming a lot of memory all at once

I'm assuming the risk is low now that we support 0 for unlimited, so most people who would be writing huge concurrency values can now write 0 instead.

}

func (cu *configUpgrader) lookup(v1Config map[string]interface{}, path string) (interface{}, bool) {
for {
elem := path
rest := ""
if i := strings.Index(path, "."); i != -1 {
elem, rest = path[0:i], path[i+1:]
}
val, ok := x[elem]
val, ok := v1Config[elem]
if rest == "" {
return val, ok
}
child, ok := val.(map[string]interface{})
if !ok {
return nil, false
}
path, x = rest, child
path, v1Config = rest, child
}
}
19 changes: 18 additions & 1 deletion cmd/influxd/upgrade/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func TestConfigUpgrade(t *testing.T) {
config1x: testConfigV1obsoleteArrays,
config2x: testConfigV2obsoleteArrays,
},
{
name: "query concurrency",
config1x: testConfigV1QueryConcurrency,
config2x: testConfigV2QueryConcurrency,
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -398,6 +403,11 @@ reporting-disabled = true
var testConfigV1empty = `
`

var testConfigV1QueryConcurrency = `
[coordinator]
max-concurrent-queries = 128
`

// 2.x test configs

var testConfigV2minimal = `reporting-disabled = false
Expand All @@ -422,7 +432,7 @@ influxql-max-select-buckets = 0
influxql-max-select-point = 0
influxql-max-select-series = 0
log-level = "info"
query-concurrency = 10
query-concurrency = 0
storage-cache-max-memory-size = 1073741824
storage-cache-snapshot-memory-size = 26214400
storage-cache-snapshot-write-cold-duration = "10m0s"
Expand Down Expand Up @@ -453,3 +463,10 @@ var testConfigV2empty = `
bolt-path = "/db/.influxdbv2/influxd.bolt"
engine-path = "/db/.influxdbv2/engine"
`

var testConfigV2QueryConcurrency = `
bolt-path = "/db/.influxdbv2/influxd.bolt"
engine-path = "/db/.influxdbv2/engine"
query-concurrency = 128
query-queue-size = 128
`
Loading