Skip to content

Commit

Permalink
Refactor of commands and flag parsing for sidecar (#2267)
Browse files Browse the repository at this point in the history
Signed-off-by: Philip Gough <philip.p.gough@gmail.com>
  • Loading branch information
philipgough authored May 11, 2020
1 parent 71460e3 commit a384c43
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 87 deletions.
127 changes: 127 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package main

import (
"net/url"
"time"

"github.com/prometheus/common/model"
"gopkg.in/alecthomas/kingpin.v2"
)

type grpcConfig struct {
bindAddress string
gracePeriod model.Duration
tlsSrvCert string
tlsSrvKey string
tlsSrvClientCA string
}

func (gc *grpcConfig) registerFlag(cmd *kingpin.CmdClause) *grpcConfig {
cmd.Flag("grpc-address",
"Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components.").
Default("0.0.0.0:10901").StringVar(&gc.bindAddress)
cmd.Flag("grpc-grace-period",
"Time to wait after an interrupt received for GRPC Server.").
Default("2m").SetValue(&gc.gracePeriod)
cmd.Flag("grpc-server-tls-cert",
"TLS Certificate for gRPC server, leave blank to disable TLS").
Default("").StringVar(&gc.tlsSrvCert)
cmd.Flag("grpc-server-tls-key",
"TLS Key for the gRPC server, leave blank to disable TLS").
Default("").StringVar(&gc.tlsSrvKey)
cmd.Flag("grpc-server-tls-client-ca",
"TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").
Default("").StringVar(&gc.tlsSrvClientCA)
return gc
}

type httpConfig struct {
bindAddress string
gracePeriod model.Duration
}

func (hc *httpConfig) registerFlag(cmd *kingpin.CmdClause) *httpConfig {
cmd.Flag("http-address",
"Listen host:port for HTTP endpoints.").
Default("0.0.0.0:10902").StringVar(&hc.bindAddress)
cmd.Flag("http-grace-period",
"Time to wait after an interrupt received for HTTP Server.").
Default("2m").SetValue(&hc.gracePeriod)
return hc
}

type prometheusConfig struct {
url *url.URL
readyTimeout time.Duration
}

func (pc *prometheusConfig) registerFlag(cmd *kingpin.CmdClause) *prometheusConfig {
cmd.Flag("prometheus.url",
"URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URLVar(&pc.url)
cmd.Flag("prometheus.ready_timeout",
"Maximum time to wait for the Prometheus instance to start up").
Default("10m").DurationVar(&pc.readyTimeout)
return pc
}

type connConfig struct {
maxIdleConns int
maxIdleConnsPerHost int
}

func (cc *connConfig) registerFlag(cmd *kingpin.CmdClause) *connConfig {
cmd.Flag("receive.connection-pool-size",
"Controls the http MaxIdleConns. Default is 0, which is unlimited").
IntVar(&cc.maxIdleConns)
cmd.Flag("receive.connection-pool-size-per-host",
"Controls the http MaxIdleConnsPerHost").
Default("100").IntVar(&cc.maxIdleConnsPerHost)
return cc
}

type tsdbConfig struct {
path string
}

func (tc *tsdbConfig) registerFlag(cmd *kingpin.CmdClause) *tsdbConfig {
cmd.Flag("tsdb.path", "Data directory of TSDB.").Default("./data").StringVar(&tc.path)
return tc
}

type reloaderConfig struct {
confFile string
envVarConfFile string
ruleDirectories []string
}

func (rc *reloaderConfig) registerFlag(cmd *kingpin.CmdClause) *reloaderConfig {
cmd.Flag("reloader.config-file",
"Config file watched by the reloader.").
Default("").StringVar(&rc.confFile)
cmd.Flag("reloader.config-envsubst-file",
"Output file for environment variable substituted config file.").
Default("").StringVar(&rc.envVarConfFile)
cmd.Flag("reloader.rule-dir",
"Rule directories for the reloader to refresh (repeated field).").
StringsVar(&rc.ruleDirectories)
return rc
}

type shipperConfig struct {
uploadCompacted bool
ignoreBlockSize bool
}

func (sc *shipperConfig) registerFlag(cmd *kingpin.CmdClause) *shipperConfig {
cmd.Flag("shipper.upload-compacted",
"If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done.").
Default("false").BoolVar(&sc.uploadCompacted)
cmd.Flag("shipper.ignore-unequal-block-size",
"If true sidecar will not require prometheus min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled on your Prometheus instance, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").
Default("false").Hidden().BoolVar(&sc.ignoreBlockSize)
return sc
}
3 changes: 1 addition & 2 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"fmt"
"strings"

"github.com/thanos-io/thanos/pkg/extflag"

"github.com/prometheus/common/model"
"github.com/thanos-io/thanos/pkg/extflag"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down
138 changes: 53 additions & 85 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,72 +44,27 @@ import (

func registerSidecar(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(component.Sidecar.String(), "sidecar for Prometheus server")

httpBindAddr, httpGracePeriod := regHTTPFlags(cmd)
grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd)

promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URL()

promReadyTimeout := cmd.Flag("prometheus.ready_timeout", "Maximum time to wait for the Prometheus instance to start up").
Default("10m").Duration()

connectionPoolSize := cmd.Flag("receive.connection-pool-size", "Controls the http MaxIdleConns. Default is 0, which is unlimited").Int()
connectionPoolSizePerHost := cmd.Flag("receive.connection-pool-size-per-host", "Controls the http MaxIdleConnsPerHost").Default("100").Int()

dataDir := cmd.Flag("tsdb.path", "Data directory of TSDB.").
Default("./data").String()

reloaderCfgFile := cmd.Flag("reloader.config-file", "Config file watched by the reloader.").
Default("").String()

reloaderCfgOutputFile := cmd.Flag("reloader.config-envsubst-file", "Output file for environment variable substituted config file.").
Default("").String()

reloaderRuleDirs := cmd.Flag("reloader.rule-dir", "Rule directories for the reloader to refresh (repeated field).").Strings()

objStoreConfig := regCommonObjStoreFlags(cmd, "", false)

uploadCompacted := cmd.Flag("shipper.upload-compacted", "If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done.").Default("false").Bool()

ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true sidecar will not require prometheus min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled on your Prometheus instance, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool()

minTime := thanosmodel.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
conf := &sidecarConfig{}
conf.registerFlag(cmd)

m[component.Sidecar.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
rl := reloader.New(
log.With(logger, "component", "reloader"),
extprom.WrapRegistererWithPrefix("thanos_sidecar_", reg),
reloader.ReloadURLFromBase(*promURL),
*reloaderCfgFile,
*reloaderCfgOutputFile,
*reloaderRuleDirs,
reloader.ReloadURLFromBase(conf.prometheus.url),
conf.reloader.confFile,
conf.reloader.envVarConfFile,
conf.reloader.ruleDirectories,
)

return runSidecar(
g,
logger,
reg,
tracer,
*grpcBindAddr,
time.Duration(*grpcGracePeriod),
*grpcCert,
*grpcKey,
*grpcClientCA,
*httpBindAddr,
time.Duration(*httpGracePeriod),
*promURL,
*promReadyTimeout,
*dataDir,
objStoreConfig,
rl,
*uploadCompacted,
*ignoreBlockSize,
component.Sidecar,
*minTime,
*connectionPoolSize,
*connectionPoolSizePerHost,
*conf,
)
}
}
Expand All @@ -119,37 +74,22 @@ func runSidecar(
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
grpcBindAddr string,
grpcGracePeriod time.Duration,
grpcCert string,
grpcKey string,
grpcClientCA string,
httpBindAddr string,
httpGracePeriod time.Duration,
promURL *url.URL,
promReadyTimeout time.Duration,
dataDir string,
objStoreConfig *extflag.PathOrContent,
reloader *reloader.Reloader,
uploadCompacted bool,
ignoreBlockSize bool,
comp component.Component,
limitMinTime thanosmodel.TimeOrDurationValue,
connectionPoolSize int,
connectionPoolSizePerHost int,
conf sidecarConfig,
) error {
var m = &promMetadata{
promURL: promURL,
promURL: conf.prometheus.url,

// Start out with the full time range. The shipper will constrain it later.
// TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled.
mint: limitMinTime.PrometheusTimestamp(),
mint: conf.limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: limitMinTime,
limitMinTime: conf.limitMinTime,
}

confContentYaml, err := objStoreConfig.Content()
confContentYaml, err := conf.objStore.Content()
if err != nil {
return errors.Wrap(err, "getting object store config")
}
Expand All @@ -169,8 +109,8 @@ func runSidecar(
)

srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
httpserver.WithListen(conf.http.bindAddress),
httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)),
)

g.Add(func() error {
Expand Down Expand Up @@ -200,7 +140,7 @@ func runSidecar(
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, ignoreBlockSize, m); err != nil {
if err := validatePrometheus(ctx, logger, conf.shipper.ignoreBlockSize, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}
Expand Down Expand Up @@ -266,23 +206,24 @@ func runSidecar(

{
t := exthttp.NewTransport()
t.MaxIdleConnsPerHost = connectionPoolSizePerHost
t.MaxIdleConns = connectionPoolSize
t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost
t.MaxIdleConns = conf.connection.maxIdleConns
c := &http.Client{Transport: tracing.HTTPTripperware(logger, t)}

promStore, err := store.NewPrometheusStore(logger, c, promURL, component.Sidecar, m.Labels, m.Timestamps)
promStore, err := store.NewPrometheusStore(logger, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}

tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA)
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"),
conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
)
g.Add(func() error {
Expand All @@ -309,14 +250,15 @@ func runSidecar(
}
}()

if err := promclient.IsWALDirAccessible(dataDir); err != nil {
if err := promclient.IsWALDirAccessible(conf.tsdb.path); err != nil {
level.Error(logger).Log("err", err)
}

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

promReadyTimeout := conf.prometheus.readyTimeout
extLabelsCtx, cancel := context.WithTimeout(ctx, promReadyTimeout)
defer cancel()

Expand All @@ -330,10 +272,10 @@ func runSidecar(
}

var s *shipper.Shipper
if uploadCompacted {
s = shipper.NewWithCompacted(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
if conf.shipper.uploadCompacted {
s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource)
} else {
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource)
}

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
Expand Down Expand Up @@ -458,3 +400,29 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) {

return s.mint, s.maxt
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
connection connConfig
tsdb tsdbConfig
reloader reloaderConfig
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
}

func (sc *sidecarConfig) registerFlag(cmd *kingpin.CmdClause) *sidecarConfig {
sc.http.registerFlag(cmd)
sc.grpc.registerFlag(cmd)
sc.prometheus.registerFlag(cmd)
sc.connection.registerFlag(cmd)
sc.tsdb.registerFlag(cmd)
sc.reloader.registerFlag(cmd)
sc.objStore = *regCommonObjStoreFlags(cmd, "", false)
sc.shipper.registerFlag(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
return sc
}

0 comments on commit a384c43

Please sign in to comment.