-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor of commands and flag parsing #2267
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package main | ||
|
||
import ( | ||
"net/url" | ||
"time" | ||
|
||
"github.com/prometheus/common/model" | ||
"gopkg.in/alecthomas/kingpin.v2" | ||
) | ||
|
||
type grpcConfig struct { | ||
bindAddress string | ||
gracePeriod model.Duration | ||
tlsSrvCert string | ||
tlsSrvKey string | ||
tlsSrvClientCA string | ||
} | ||
|
||
func (gc *grpcConfig) registerFlag(cmd *kingpin.CmdClause) *grpcConfig { | ||
cmd.Flag("grpc-address", | ||
"Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components."). | ||
Default("0.0.0.0:10901").StringVar(&gc.bindAddress) | ||
cmd.Flag("grpc-grace-period", | ||
"Time to wait after an interrupt received for GRPC Server."). | ||
Default("2m").SetValue(&gc.gracePeriod) | ||
cmd.Flag("grpc-server-tls-cert", | ||
"TLS Certificate for gRPC server, leave blank to disable TLS"). | ||
Default("").StringVar(&gc.tlsSrvCert) | ||
cmd.Flag("grpc-server-tls-key", | ||
"TLS Key for the gRPC server, leave blank to disable TLS"). | ||
Default("").StringVar(&gc.tlsSrvKey) | ||
cmd.Flag("grpc-server-tls-client-ca", | ||
"TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)"). | ||
Default("").StringVar(&gc.tlsSrvClientCA) | ||
return gc | ||
} | ||
|
||
type httpConfig struct { | ||
bindAddress string | ||
gracePeriod model.Duration | ||
} | ||
|
||
func (hc *httpConfig) registerFlag(cmd *kingpin.CmdClause) *httpConfig { | ||
cmd.Flag("http-address", | ||
"Listen host:port for HTTP endpoints."). | ||
Default("0.0.0.0:10902").StringVar(&hc.bindAddress) | ||
cmd.Flag("http-grace-period", | ||
"Time to wait after an interrupt received for HTTP Server."). | ||
Default("2m").SetValue(&hc.gracePeriod) | ||
return hc | ||
} | ||
|
||
type prometheusConfig struct { | ||
url *url.URL | ||
readyTimeout time.Duration | ||
} | ||
|
||
func (pc *prometheusConfig) registerFlag(cmd *kingpin.CmdClause) *prometheusConfig { | ||
cmd.Flag("prometheus.url", | ||
"URL at which to reach Prometheus's API. For better performance use local network."). | ||
Default("http://localhost:9090").URLVar(&pc.url) | ||
cmd.Flag("prometheus.ready_timeout", | ||
"Maximum time to wait for the Prometheus instance to start up"). | ||
Default("10m").DurationVar(&pc.readyTimeout) | ||
return pc | ||
} | ||
|
||
type connConfig struct { | ||
maxIdleConns int | ||
maxIdleConnsPerHost int | ||
} | ||
|
||
func (cc *connConfig) registerFlag(cmd *kingpin.CmdClause) *connConfig { | ||
cmd.Flag("receive.connection-pool-size", | ||
"Controls the http MaxIdleConns. Default is 0, which is unlimited"). | ||
IntVar(&cc.maxIdleConns) | ||
cmd.Flag("receive.connection-pool-size-per-host", | ||
"Controls the http MaxIdleConnsPerHost"). | ||
Default("100").IntVar(&cc.maxIdleConnsPerHost) | ||
return cc | ||
} | ||
|
||
type tsdbConfig struct { | ||
path string | ||
} | ||
|
||
func (tc *tsdbConfig) registerFlag(cmd *kingpin.CmdClause) *tsdbConfig { | ||
cmd.Flag("tsdb.path", "Data directory of TSDB.").Default("./data").StringVar(&tc.path) | ||
return tc | ||
} | ||
|
||
type reloaderConfig struct { | ||
confFile string | ||
envVarConfFile string | ||
ruleDirectories []string | ||
} | ||
|
||
func (rc *reloaderConfig) registerFlag(cmd *kingpin.CmdClause) *reloaderConfig { | ||
cmd.Flag("reloader.config-file", | ||
"Config file watched by the reloader."). | ||
Default("").StringVar(&rc.confFile) | ||
cmd.Flag("reloader.config-envsubst-file", | ||
"Output file for environment variable substituted config file."). | ||
Default("").StringVar(&rc.envVarConfFile) | ||
cmd.Flag("reloader.rule-dir", | ||
"Rule directories for the reloader to refresh (repeated field)."). | ||
StringsVar(&rc.ruleDirectories) | ||
return rc | ||
} | ||
|
||
type shipperConfig struct { | ||
uploadCompacted bool | ||
ignoreBlockSize bool | ||
} | ||
|
||
func (sc *shipperConfig) registerFlag(cmd *kingpin.CmdClause) *shipperConfig { | ||
cmd.Flag("shipper.upload-compacted", | ||
"If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done."). | ||
Default("false").BoolVar(&sc.uploadCompacted) | ||
cmd.Flag("shipper.ignore-unequal-block-size", | ||
"If true sidecar will not require prometheus min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled on your Prometheus instance, as in the worst case it can result in ~2h data loss for your Thanos bucket storage."). | ||
Default("false").Hidden().BoolVar(&sc.ignoreBlockSize) | ||
return sc | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,72 +44,27 @@ import ( | |
|
||
func registerSidecar(m map[string]setupFunc, app *kingpin.Application) { | ||
cmd := app.Command(component.Sidecar.String(), "sidecar for Prometheus server") | ||
|
||
httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) | ||
grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd) | ||
|
||
promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network."). | ||
Default("http://localhost:9090").URL() | ||
|
||
promReadyTimeout := cmd.Flag("prometheus.ready_timeout", "Maximum time to wait for the Prometheus instance to start up"). | ||
Default("10m").Duration() | ||
|
||
connectionPoolSize := cmd.Flag("receive.connection-pool-size", "Controls the http MaxIdleConns. Default is 0, which is unlimited").Int() | ||
connectionPoolSizePerHost := cmd.Flag("receive.connection-pool-size-per-host", "Controls the http MaxIdleConnsPerHost").Default("100").Int() | ||
|
||
dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB."). | ||
Default("./data").String() | ||
|
||
reloaderCfgFile := cmd.Flag("reloader.config-file", "Config file watched by the reloader."). | ||
Default("").String() | ||
|
||
reloaderCfgOutputFile := cmd.Flag("reloader.config-envsubst-file", "Output file for environment variable substituted config file."). | ||
Default("").String() | ||
|
||
reloaderRuleDirs := cmd.Flag("reloader.rule-dir", "Rule directories for the reloader to refresh (repeated field).").Strings() | ||
|
||
objStoreConfig := regCommonObjStoreFlags(cmd, "", false) | ||
|
||
uploadCompacted := cmd.Flag("shipper.upload-compacted", "If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done.").Default("false").Bool() | ||
|
||
ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true sidecar will not require prometheus min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled on your Prometheus instance, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool() | ||
|
||
minTime := thanosmodel.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). | ||
Default("0000-01-01T00:00:00Z")) | ||
conf := &sidecarConfig{} | ||
conf.registerFlag(cmd) | ||
|
||
m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { | ||
rl := reloader.New( | ||
log.With(logger, "component", "reloader"), | ||
extprom.WrapRegistererWithPrefix("thanos_sidecar_", reg), | ||
reloader.ReloadURLFromBase(*promURL), | ||
*reloaderCfgFile, | ||
*reloaderCfgOutputFile, | ||
*reloaderRuleDirs, | ||
reloader.ReloadURLFromBase(conf.prometheus.url), | ||
conf.reloader.confFile, | ||
conf.reloader.envVarConfFile, | ||
conf.reloader.ruleDirectories, | ||
) | ||
|
||
return runSidecar( | ||
g, | ||
logger, | ||
reg, | ||
tracer, | ||
*grpcBindAddr, | ||
time.Duration(*grpcGracePeriod), | ||
*grpcCert, | ||
*grpcKey, | ||
*grpcClientCA, | ||
*httpBindAddr, | ||
time.Duration(*httpGracePeriod), | ||
*promURL, | ||
*promReadyTimeout, | ||
*dataDir, | ||
objStoreConfig, | ||
rl, | ||
*uploadCompacted, | ||
*ignoreBlockSize, | ||
component.Sidecar, | ||
*minTime, | ||
*connectionPoolSize, | ||
*connectionPoolSizePerHost, | ||
*conf, | ||
) | ||
} | ||
} | ||
|
@@ -119,37 +74,22 @@ func runSidecar( | |
logger log.Logger, | ||
reg *prometheus.Registry, | ||
tracer opentracing.Tracer, | ||
grpcBindAddr string, | ||
grpcGracePeriod time.Duration, | ||
grpcCert string, | ||
grpcKey string, | ||
grpcClientCA string, | ||
httpBindAddr string, | ||
httpGracePeriod time.Duration, | ||
promURL *url.URL, | ||
promReadyTimeout time.Duration, | ||
dataDir string, | ||
objStoreConfig *extflag.PathOrContent, | ||
reloader *reloader.Reloader, | ||
uploadCompacted bool, | ||
ignoreBlockSize bool, | ||
comp component.Component, | ||
limitMinTime thanosmodel.TimeOrDurationValue, | ||
connectionPoolSize int, | ||
connectionPoolSizePerHost int, | ||
conf sidecarConfig, | ||
) error { | ||
var m = &promMetadata{ | ||
promURL: promURL, | ||
promURL: conf.prometheus.url, | ||
|
||
// Start out with the full time range. The shipper will constrain it later. | ||
// TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled. | ||
mint: limitMinTime.PrometheusTimestamp(), | ||
mint: conf.limitMinTime.PrometheusTimestamp(), | ||
maxt: math.MaxInt64, | ||
|
||
limitMinTime: limitMinTime, | ||
limitMinTime: conf.limitMinTime, | ||
} | ||
|
||
confContentYaml, err := objStoreConfig.Content() | ||
confContentYaml, err := conf.objStore.Content() | ||
if err != nil { | ||
return errors.Wrap(err, "getting object store config") | ||
} | ||
|
@@ -169,8 +109,8 @@ func runSidecar( | |
) | ||
|
||
srv := httpserver.New(logger, reg, comp, httpProbe, | ||
httpserver.WithListen(httpBindAddr), | ||
httpserver.WithGracePeriod(httpGracePeriod), | ||
httpserver.WithListen(conf.http.bindAddress), | ||
httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)), | ||
) | ||
|
||
g.Add(func() error { | ||
|
@@ -200,7 +140,7 @@ func runSidecar( | |
// Only check Prometheus's flags when upload is enabled. | ||
if uploads { | ||
// Check prometheus's flags to ensure sane sidecar flags. | ||
if err := validatePrometheus(ctx, logger, ignoreBlockSize, m); err != nil { | ||
if err := validatePrometheus(ctx, logger, conf.shipper.ignoreBlockSize, m); err != nil { | ||
return errors.Wrap(err, "validate Prometheus flags") | ||
} | ||
} | ||
|
@@ -266,23 +206,24 @@ func runSidecar( | |
|
||
{ | ||
t := exthttp.NewTransport() | ||
t.MaxIdleConnsPerHost = connectionPoolSizePerHost | ||
t.MaxIdleConns = connectionPoolSize | ||
t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost | ||
t.MaxIdleConns = conf.connection.maxIdleConns | ||
c := &http.Client{Transport: tracing.HTTPTripperware(logger, t)} | ||
|
||
promStore, err := store.NewPrometheusStore(logger, c, promURL, component.Sidecar, m.Labels, m.Timestamps) | ||
promStore, err := store.NewPrometheusStore(logger, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps) | ||
if err != nil { | ||
return errors.Wrap(err, "create Prometheus store") | ||
} | ||
|
||
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA) | ||
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), | ||
conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA) | ||
if err != nil { | ||
return errors.Wrap(err, "setup gRPC server") | ||
} | ||
|
||
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, | ||
grpcserver.WithListen(grpcBindAddr), | ||
grpcserver.WithGracePeriod(grpcGracePeriod), | ||
grpcserver.WithListen(conf.grpc.bindAddress), | ||
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)), | ||
grpcserver.WithTLSConfig(tlsCfg), | ||
) | ||
g.Add(func() error { | ||
|
@@ -309,14 +250,15 @@ func runSidecar( | |
} | ||
}() | ||
|
||
if err := promclient.IsWALDirAccessible(dataDir); err != nil { | ||
if err := promclient.IsWALDirAccessible(conf.tsdb.path); err != nil { | ||
level.Error(logger).Log("err", err) | ||
} | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
g.Add(func() error { | ||
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") | ||
|
||
promReadyTimeout := conf.prometheus.readyTimeout | ||
extLabelsCtx, cancel := context.WithTimeout(ctx, promReadyTimeout) | ||
defer cancel() | ||
|
||
|
@@ -330,10 +272,10 @@ func runSidecar( | |
} | ||
|
||
var s *shipper.Shipper | ||
if uploadCompacted { | ||
s = shipper.NewWithCompacted(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource) | ||
if conf.shipper.uploadCompacted { | ||
s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource) | ||
} else { | ||
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource) | ||
s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource) | ||
} | ||
|
||
return runutil.Repeat(30*time.Second, ctx.Done(), func() error { | ||
|
@@ -458,3 +400,29 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) { | |
|
||
return s.mint, s.maxt | ||
} | ||
|
||
type sidecarConfig struct { | ||
http httpConfig | ||
grpc grpcConfig | ||
prometheus prometheusConfig | ||
connection connConfig | ||
tsdb tsdbConfig | ||
reloader reloaderConfig | ||
objStore extflag.PathOrContent | ||
shipper shipperConfig | ||
limitMinTime thanosmodel.TimeOrDurationValue | ||
} | ||
|
||
func (sc *sidecarConfig) registerFlag(cmd *kingpin.CmdClause) *sidecarConfig { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking forward for some tool for this, but let's do in separate PR (: |
||
sc.http.registerFlag(cmd) | ||
sc.grpc.registerFlag(cmd) | ||
sc.prometheus.registerFlag(cmd) | ||
sc.connection.registerFlag(cmd) | ||
sc.tsdb.registerFlag(cmd) | ||
sc.reloader.registerFlag(cmd) | ||
sc.objStore = *regCommonObjStoreFlags(cmd, "", false) | ||
sc.shipper.registerFlag(cmd) | ||
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). | ||
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime) | ||
return sc | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can now totally remove this item. Instead of registration we can invoke run directly I think 👍