diff --git a/CHANGELOG.md b/CHANGELOG.md index 059eddbcc2..68be8ef781 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added +- [#1687](https://github.com/thanos-io/thanos/pull/1687) Add a new `--grpc-grace-period` CLI option to components which serve gRPC to set how long to wait until gRPC Server shuts down. - [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up. - [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss) for further information. - [#1680](https://github.com/thanos-io/thanos/pull/1680) Add a new `--http-grace-period` CLI option to components which serve HTTP to set how long to wait until HTTP Server shuts down. diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index e4bfc90802..244d8942a1 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -10,6 +10,15 @@ import ( "text/template" "time" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/run" + "github.com/oklog/ulid" + "github.com/olekukonko/tablewriter" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" @@ -20,19 +29,9 @@ import ( "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" - "github.com/thanos-io/thanos/pkg/server" + httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/ui" "github.com/thanos-io/thanos/pkg/verifier" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/oklog/run" - "github.com/oklog/ulid" - "github.com/olekukonko/tablewriter" - opentracing "github.com/opentracing/opentracing-go" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/tsdb/labels" "golang.org/x/text/language" "golang.org/x/text/message" kingpin "gopkg.in/alecthomas/kingpin.v2" @@ -311,7 +310,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) { cmd := root.Command("web", "Web interface for remote storage bucket") bind := cmd.Flag("listen", "HTTP host:port to listen on").Default("0.0.0.0:8080").String() - httpGracePeriod := regHTTPGracePeriodFlag(cmd) + _, httpGracePeriod := regHTTPFlags(cmd) interval := cmd.Flag("refresh", "Refresh interval to download metadata from remote storage").Default("30m").Duration() timeout := cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").Duration() label := cmd.Flag("label", "Prometheus label to use as timeline title").String() @@ -321,9 +320,9 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str statusProber := prober.NewProber(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - srv := server.NewHTTP(logger, reg, component.Bucket, statusProber, - server.WithListen(*bind), - server.WithGracePeriod(time.Duration(*httpGracePeriod)), + srv := httpserver.New(logger, reg, component.Bucket, statusProber, + httpserver.WithListen(*bind), + httpserver.WithGracePeriod(time.Duration(*httpGracePeriod)), ) bucketUI := ui.NewBucketUI(logger, *label) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 09ea8fba93..ba09994913 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -10,9 +10,6 @@ import ( "strings" "time" - "github.com/thanos-io/thanos/pkg/extflag" - "github.com/thanos-io/thanos/pkg/server" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -25,10 +22,12 @@ import ( "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" + httpserver "github.com/thanos-io/thanos/pkg/server/http" "gopkg.in/alecthomas/kingpin.v2" ) @@ -79,8 +78,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { "Compaction index verification will ignore out of order label names."). Hidden().Default("false").Bool() - httpAddr := regHTTPAddrFlag(cmd) - httpGracePeriod := regHTTPGracePeriodFlag(cmd) + httpAddr, httpGracePeriod := regHTTPFlags(cmd) dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process compactions."). Default("./data").String() @@ -179,9 +177,9 @@ func runCompact( statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - srv := server.NewHTTP(logger, reg, component, statusProber, - server.WithListen(httpBindAddr), - server.WithGracePeriod(httpGracePeriod), + srv := httpserver.New(logger, reg, component, statusProber, + httpserver.WithListen(httpBindAddr), + httpserver.WithGracePeriod(httpGracePeriod), ) g.Add(srv.ListenAndServe, srv.Shutdown) diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 9e360cec67..655bc20b63 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -6,9 +6,6 @@ import ( "path/filepath" "time" - "github.com/thanos-io/thanos/pkg/extflag" - "github.com/thanos-io/thanos/pkg/server" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -23,10 +20,12 @@ import ( "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" + httpserver "github.com/thanos-io/thanos/pkg/server/http" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -34,8 +33,7 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application) { comp := component.Downsample cmd := app.Command(comp.String(), "continuously downsamples blocks in an object store bucket") - httpAddr := regHTTPAddrFlag(cmd) - httpGracePeriod := regHTTPGracePeriodFlag(cmd) + httpAddr, httpGracePeriod := regHTTPFlags(cmd) dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings."). Default("./data").String() @@ -126,9 +124,9 @@ func runDownsample( } // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - srv := server.NewHTTP(logger, reg, comp, statusProber, - server.WithListen(httpBindAddr), - server.WithGracePeriod(httpGracePeriod), + srv := httpserver.New(logger, reg, comp, statusProber, + httpserver.WithListen(httpBindAddr), + httpserver.WithGracePeriod(httpGracePeriod), ) g.Add(srv.ListenAndServe, srv.Shutdown) diff --git a/cmd/thanos/flags.go b/cmd/thanos/flags.go index 135552e6f6..b6d7776086 100644 --- a/cmd/thanos/flags.go +++ b/cmd/thanos/flags.go @@ -19,29 +19,31 @@ func modelDuration(flags *kingpin.FlagClause) *model.Duration { func regGRPCFlags(cmd *kingpin.CmdClause) ( grpcBindAddr *string, + grpcGracePeriod *model.Duration, grpcTLSSrvCert *string, grpcTLSSrvKey *string, grpcTLSSrvClientCA *string, ) { grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components."). Default("0.0.0.0:10901").String() + grpcGracePeriod = modelDuration(cmd.Flag("grpc-grace-period", "Time to wait after an interrupt received for GRPC Server.").Default("2m")) // by default it's the same as query.timeout. grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String() grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server, leave blank to disable TLS").Default("").String() grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String() return grpcBindAddr, + grpcGracePeriod, grpcTLSSrvCert, grpcTLSSrvKey, grpcTLSSrvClientCA } -func regHTTPAddrFlag(cmd *kingpin.CmdClause) *string { - return cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String() -} +func regHTTPFlags(cmd *kingpin.CmdClause) (httpBindAddr *string, httpGracePeriod *model.Duration) { + httpBindAddr = cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String() + httpGracePeriod = modelDuration(cmd.Flag("http-grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("2m")) // by default it's the same as query.timeout. -func regHTTPGracePeriodFlag(cmd *kingpin.CmdClause) *model.Duration { - return modelDuration(cmd.Flag("http-grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("5s")) + return httpBindAddr, httpGracePeriod } func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string, required bool, extraDesc ...string) *extflag.PathOrContent { diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index 3e93628e9b..e4b7a06e72 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -2,39 +2,25 @@ package main import ( "context" - "crypto/tls" - "crypto/x509" "fmt" "io" - "io/ioutil" - "math" "os" "os/signal" "path/filepath" "runtime" - "runtime/debug" "syscall" gmetrics "github.com/armon/go-metrics" gprom "github.com/armon/go-metrics/prometheus" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/version" - "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/tracing/client" "go.uber.org/automaxprocs/maxprocs" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/status" "gopkg.in/alecthomas/kingpin.v2" ) @@ -222,145 +208,3 @@ func interrupt(logger log.Logger, cancel <-chan struct{}) error { return errors.New("canceled") } } - -func defaultGRPCTLSServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) { - opts := []grpc.ServerOption{} - tlsCfg, err := defaultTLSServerOpts(log.With(logger, "protocol", "gRPC"), cert, key, clientCA) - if err != nil { - return opts, err - } - if tlsCfg != nil { - opts = append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))) - } - return opts, nil -} - -func defaultTLSServerOpts(logger log.Logger, cert, key, clientCA string) (*tls.Config, error) { - if key == "" && cert == "" { - if clientCA != "" { - return nil, errors.New("when a client CA is used a server key and certificate must also be provided") - } - - level.Info(logger).Log("msg", "disabled TLS, key and cert must be set to enable") - return nil, nil - } - - level.Info(logger).Log("msg", "enabling server side TLS") - - if key == "" || cert == "" { - return nil, errors.New("both server key and certificate must be provided") - } - - tlsCfg := &tls.Config{ - MinVersion: tls.VersionTLS12, - } - - tlsCert, err := tls.LoadX509KeyPair(cert, key) - if err != nil { - return nil, errors.Wrap(err, "server credentials") - } - - tlsCfg.Certificates = []tls.Certificate{tlsCert} - - if clientCA != "" { - caPEM, err := ioutil.ReadFile(clientCA) - if err != nil { - return nil, errors.Wrap(err, "reading client CA") - } - - certPool := x509.NewCertPool() - if !certPool.AppendCertsFromPEM(caPEM) { - return nil, errors.Wrap(err, "building client CA") - } - tlsCfg.ClientCAs = certPool - tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert - - level.Info(logger).Log("msg", "server TLS client verification enabled") - } - - return tlsCfg, nil -} - -func defaultTLSClientOpts(logger log.Logger, cert, key, caCert, serverName string) (*tls.Config, error) { - var certPool *x509.CertPool - if caCert != "" { - caPEM, err := ioutil.ReadFile(caCert) - if err != nil { - return nil, errors.Wrap(err, "reading client CA") - } - - certPool = x509.NewCertPool() - if !certPool.AppendCertsFromPEM(caPEM) { - return nil, errors.Wrap(err, "building client CA") - } - level.Info(logger).Log("msg", "TLS client using provided certificate pool") - } else { - var err error - certPool, err = x509.SystemCertPool() - if err != nil { - return nil, errors.Wrap(err, "reading system certificate pool") - } - level.Info(logger).Log("msg", "TLS client using system certificate pool") - } - - tlsCfg := &tls.Config{ - RootCAs: certPool, - } - - if serverName != "" { - tlsCfg.ServerName = serverName - } - - if (key != "") != (cert != "") { - return nil, errors.New("both client key and certificate must be provided") - } - - if cert != "" { - cert, err := tls.LoadX509KeyPair(cert, key) - if err != nil { - return nil, errors.Wrap(err, "client credentials") - } - tlsCfg.Certificates = []tls.Certificate{cert} - level.Info(logger).Log("msg", "TLS client authentication enabled") - } - return tlsCfg, nil -} - -func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server { - met := grpc_prometheus.NewServerMetrics() - met.EnableHandlingTimeHistogram( - grpc_prometheus.WithHistogramBuckets([]float64{ - 0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4, - }), - ) - panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_grpc_req_panics_recovered_total", - Help: "Total number of gRPC requests recovered from internal panic.", - }) - reg.MustRegister(met, panicsTotal) - - grpcPanicRecoveryHandler := func(p interface{}) (err error) { - panicsTotal.Inc() - level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack()) - return status.Errorf(codes.Internal, "%s", p) - } - opts = append(opts, - grpc.MaxSendMsgSize(math.MaxInt32), - grpc_middleware.WithUnaryServerChain( - met.UnaryServerInterceptor(), - tracing.UnaryServerInterceptor(tracer), - grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), - ), - grpc_middleware.WithStreamServerChain( - met.StreamServerInterceptor(), - tracing.StreamServerInterceptor(tracer), - grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), - ), - ) - - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, srv) - met.InitializeMetrics(s) - - return s -} diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index c9d93b4feb..0be1e769b0 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math" - "net" "net/http" "path" "time" @@ -31,8 +30,10 @@ import ( "github.com/thanos-io/thanos/pkg/query" v1 "github.com/thanos-io/thanos/pkg/query/api" "github.com/thanos-io/thanos/pkg/runutil" - "github.com/thanos-io/thanos/pkg/server" + grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" + httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/ui" "google.golang.org/grpc" @@ -45,9 +46,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { comp := component.Query cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes") - httpBindAddr := regHTTPAddrFlag(cmd) - httpGracePeriod := regHTTPGracePeriodFlag(cmd) - grpcBindAddr, srvCert, srvKey, srvClientCA := regGRPCFlags(cmd) + httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) + grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd) secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool() cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String() @@ -133,9 +133,10 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { reg, tracer, *grpcBindAddr, - *srvCert, - *srvKey, - *srvClientCA, + time.Duration(*grpcGracePeriod), + *grpcCert, + *grpcKey, + *grpcClientCA, *secure, *cert, *key, @@ -201,7 +202,7 @@ func storeClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer ope level.Info(logger).Log("msg", "enabling client to server TLS") - tlsCfg, err := defaultTLSClientOpts(logger, cert, key, caCert, serverName) + tlsCfg, err := tls.NewClientConfig(logger, cert, key, caCert, serverName) if err != nil { return nil, err } @@ -216,9 +217,10 @@ func runQuery( reg *prometheus.Registry, tracer opentracing.Tracer, grpcBindAddr string, - srvCert string, - srvKey string, - srvClientCA string, + grpcGracePeriod time.Duration, + grpcCert string, + grpcKey string, + grpcClientCA string, secure bool, cert string, key string, @@ -380,9 +382,9 @@ func runQuery( api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - srv := server.NewHTTP(logger, reg, comp, statusProber, - server.WithListen(httpBindAddr), - server.WithGracePeriod(httpGracePeriod), + srv := httpserver.New(logger, reg, comp, statusProber, + httpserver.WithListen(httpBindAddr), + httpserver.WithGracePeriod(httpGracePeriod), ) srv.Handle("/", router) @@ -390,24 +392,23 @@ func runQuery( } // Start query (proxy) gRPC StoreAPI. { - l, err := net.Listen("tcp", grpcBindAddr) + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA) if err != nil { - return errors.Wrap(err, "listen gRPC on address") + return errors.Wrap(err, "setup gRPC server") } - logger := log.With(logger, "component", component.Query.String()) - opts, err := defaultGRPCTLSServerOpts(logger, srvCert, srvKey, srvClientCA) - if err != nil { - return errors.Wrap(err, "build gRPC server") - } - s := newStoreGRPCServer(logger, reg, tracer, proxy, opts) + s := grpcserver.New(logger, reg, tracer, comp, proxy, + grpcserver.WithListen(grpcBindAddr), + grpcserver.WithGracePeriod(grpcGracePeriod), + grpcserver.WithTLSConfig(tlsCfg), + ) g.Add(func() error { - level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) statusProber.SetReady() - return errors.Wrap(s.Serve(l), "serve gRPC") - }, func(error) { - s.Stop() + return s.ListenAndServe() + }, func(err error) { + statusProber.SetNotReady(err) + s.Shutdown(err) }) } diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 649b03ef5c..1424a13506 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -3,14 +3,10 @@ package main import ( "context" "fmt" - "net" "os" "strings" "time" - "github.com/thanos-io/thanos/pkg/extflag" - "github.com/thanos-io/thanos/pkg/server" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -22,13 +18,16 @@ import ( "github.com/prometheus/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/receive" "github.com/thanos-io/thanos/pkg/runutil" + grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" + httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" - "google.golang.org/grpc" + "github.com/thanos-io/thanos/pkg/tls" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -36,9 +35,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { comp := component.Receive cmd := app.Command(comp.String(), "Accept Prometheus remote write API requests and write to local tsdb (EXPERIMENTAL, this may change drastically without notice)") - httpBindAddr := regHTTPAddrFlag(cmd) - httpGracePeriod := regHTTPGracePeriodFlag(cmd) - grpcBindAddr, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd) + httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) + grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd) rwAddress := cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). Default("0.0.0.0:19291").String() @@ -107,6 +105,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { reg, tracer, *grpcBindAddr, + time.Duration(*grpcGracePeriod), *grpcCert, *grpcKey, *grpcClientCA, @@ -141,6 +140,7 @@ func runReceive( reg *prometheus.Registry, tracer opentracing.Tracer, grpcBindAddr string, + grpcGracePeriod time.Duration, grpcCert string, grpcKey string, grpcClientCA string, @@ -178,11 +178,11 @@ func runReceive( } localStorage := &tsdb.ReadyStorage{} - rwTLSConfig, err := defaultTLSServerOpts(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA) + rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), rwServerCert, rwServerKey, rwServerClientCA) if err != nil { return err } - rwTLSClientConfig, err := defaultTLSClientOpts(logger, rwClientCert, rwClientKey, rwClientServerCA, rwClientServerName) + rwTLSClientConfig, err := tls.NewClientConfig(logger, rwClientCert, rwClientKey, rwClientServerCA, rwClientServerName) if err != nil { return err } @@ -339,42 +339,41 @@ func runReceive( level.Debug(logger).Log("msg", "setting up http server") // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - srv := server.NewHTTP(logger, reg, comp, statusProber, - server.WithListen(httpBindAddr), - server.WithGracePeriod(httpGracePeriod), + srv := httpserver.New(logger, reg, comp, statusProber, + httpserver.WithListen(httpBindAddr), + httpserver.WithGracePeriod(httpGracePeriod), ) g.Add(srv.ListenAndServe, srv.Shutdown) level.Debug(logger).Log("msg", "setting up grpc server") { - var ( - s *grpc.Server - l net.Listener - ) + var s *grpcserver.Server startGRPC := make(chan struct{}) g.Add(func() error { defer close(startGRPC) - opts, err := defaultGRPCTLSServerOpts(logger, grpcCert, grpcKey, grpcClientCA) + + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } for range dbReady { if s != nil { - s.Stop() - } - l, err = net.Listen("tcp", grpcBindAddr) - if err != nil { - return errors.Wrap(err, "listen API address") + s.Shutdown(errors.New("reload hashrings")) } tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), nil, localStorage.Get(), component.Receive, lset) - s = newStoreGRPCServer(logger, &receive.UnRegisterer{Registerer: reg}, tracer, tsdbStore, opts) + + s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, tsdbStore, + grpcserver.WithListen(grpcBindAddr), + grpcserver.WithGracePeriod(grpcGracePeriod), + grpcserver.WithTLSConfig(tlsCfg), + ) startGRPC <- struct{}{} } return nil - }, func(error) { + }, func(err error) { if s != nil { - s.Stop() + s.Shutdown(err) } }) // We need to be able to start and stop the gRPC server @@ -382,7 +381,7 @@ func runReceive( g.Add(func() error { for range startGRPC { level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr) - if err := s.Serve(l); err != nil { + if err := s.ListenAndServe(); err != nil { return errors.Wrap(err, "serve gRPC") } } diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index dedb979b2c..3d0345b2b0 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -17,9 +17,6 @@ import ( "syscall" "time" - "github.com/thanos-io/thanos/pkg/extflag" - "github.com/thanos-io/thanos/pkg/server" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -41,6 +38,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/discovery/cache" "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/objstore/client" @@ -49,9 +47,12 @@ import ( thanosrule "github.com/thanos-io/thanos/pkg/rule" v1 "github.com/thanos-io/thanos/pkg/rule/api" "github.com/thanos-io/thanos/pkg/runutil" + grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" + httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/ui" "gopkg.in/alecthomas/kingpin.v2" @@ -62,9 +63,8 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { comp := component.Rule cmd := app.Command(comp.String(), "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket") - httpBindAddr := regHTTPAddrFlag(cmd) - httpGracePeriod := regHTTPGracePeriodFlag(cmd) - grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd) + httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) + grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd) labelStrs := cmd.Flag("label", "Labels to be applied to all generated metrics (repeated). Similar to external labels for Prometheus, used to identify ruler and its blocks as unique source."). PlaceHolder("=\"\"").Strings() @@ -159,9 +159,10 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { *alertmgrs, *alertmgrsTimeout, *grpcBindAddr, - *cert, - *key, - *clientCA, + time.Duration(*grpcGracePeriod), + *grpcCert, + *grpcKey, + *grpcClientCA, *httpBindAddr, time.Duration(*httpGracePeriod), *webRoutePrefix, @@ -195,9 +196,10 @@ func runRule( alertmgrURLs []string, alertmgrsTimeout time.Duration, grpcBindAddr string, - cert string, - key string, - clientCA string, + grpcGracePeriod time.Duration, + grpcCert string, + grpcKey string, + grpcClientCA string, httpBindAddr string, httpGracePeriod time.Duration, webRoutePrefix string, @@ -502,25 +504,24 @@ func runRule( statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Start gRPC server. { - l, err := net.Listen("tcp", grpcBindAddr) - if err != nil { - return errors.Wrap(err, "listen API address") - } - logger := log.With(logger, "component", component.Rule.String()) - store := store.NewTSDBStore(logger, reg, db, component.Rule, lset) - opts, err := defaultGRPCTLSServerOpts(logger, cert, key, clientCA) + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA) if err != nil { - return errors.Wrap(err, "setup gRPC options") + return errors.Wrap(err, "setup gRPC server") } - s := newStoreGRPCServer(logger, reg, tracer, store, opts) + s := grpcserver.New(logger, reg, tracer, comp, store, + grpcserver.WithListen(grpcBindAddr), + grpcserver.WithGracePeriod(grpcGracePeriod), + grpcserver.WithTLSConfig(tlsCfg), + ) g.Add(func() error { statusProber.SetReady() - return errors.Wrap(s.Serve(l), "serve gRPC") - }, func(error) { - s.Stop() + return s.ListenAndServe() + }, func(err error) { + statusProber.SetNotReady(err) + s.Shutdown(err) }) } // Start UI & metrics HTTP server. @@ -552,9 +553,9 @@ func runRule( api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. - srv := server.NewHTTP(logger, reg, comp, statusProber, - server.WithListen(httpBindAddr), - server.WithGracePeriod(httpGracePeriod), + srv := httpserver.New(logger, reg, comp, statusProber, + httpserver.WithListen(httpBindAddr), + httpserver.WithGracePeriod(httpGracePeriod), ) srv.Handle("/", router) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index e9668065a6..437e729f69 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -3,14 +3,10 @@ package main import ( "context" "math" - "net" "net/url" "sync" "time" - "github.com/thanos-io/thanos/pkg/extflag" - "github.com/thanos-io/thanos/pkg/server" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -21,24 +17,27 @@ import ( "github.com/prometheus/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/extflag" thanosmodel "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/reloader" "github.com/thanos-io/thanos/pkg/runutil" + grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" + httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tls" "gopkg.in/alecthomas/kingpin.v2" ) func registerSidecar(m map[string]setupFunc, app *kingpin.Application) { cmd := app.Command(component.Sidecar.String(), "sidecar for Prometheus server") - httpBindAddr := regHTTPAddrFlag(cmd) - httpGracePeriod := regHTTPGracePeriodFlag(cmd) - grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd) + httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) + grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd) promURL := cmd.Flag("prometheus.url", "URL at which to reach Prometheus's API. For better performance use local network."). Default("http://localhost:9090").URL() @@ -79,9 +78,10 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application) { reg, tracer, *grpcBindAddr, - *cert, - *key, - *clientCA, + time.Duration(*grpcGracePeriod), + *grpcCert, + *grpcKey, + *grpcClientCA, *httpBindAddr, time.Duration(*httpGracePeriod), *promURL, @@ -102,9 +102,10 @@ func runSidecar( reg *prometheus.Registry, tracer opentracing.Tracer, grpcBindAddr string, - cert string, - key string, - clientCA string, + grpcGracePeriod time.Duration, + grpcCert string, + grpcKey string, + grpcClientCA string, httpBindAddr string, httpGracePeriod time.Duration, promURL *url.URL, @@ -140,9 +141,9 @@ func runSidecar( // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) - srv := server.NewHTTP(logger, reg, comp, statusProber, - server.WithListen(httpBindAddr), - server.WithGracePeriod(httpGracePeriod), + srv := httpserver.New(logger, reg, comp, statusProber, + httpserver.WithListen(httpBindAddr), + httpserver.WithGracePeriod(httpGracePeriod), ) g.Add(srv.ListenAndServe, srv.Shutdown) @@ -229,29 +230,28 @@ func runSidecar( } { - l, err := net.Listen("tcp", grpcBindAddr) - if err != nil { - return errors.Wrap(err, "listen API address") - } - logger := log.With(logger, "component", component.Sidecar.String()) - promStore, err := store.NewPrometheusStore( logger, nil, promURL, component.Sidecar, m.Labels, m.Timestamps) if err != nil { return errors.Wrap(err, "create Prometheus store") } - opts, err := defaultGRPCTLSServerOpts(logger, cert, key, clientCA) + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } - s := newStoreGRPCServer(logger, reg, tracer, promStore, opts) + s := grpcserver.New(logger, reg, tracer, comp, promStore, + grpcserver.WithListen(grpcBindAddr), + grpcserver.WithGracePeriod(grpcGracePeriod), + grpcserver.WithTLSConfig(tlsCfg), + ) g.Add(func() error { - level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) - return errors.Wrap(s.Serve(l), "serve gRPC") - }, func(error) { - s.Stop() + statusProber.SetReady() + return s.ListenAndServe() + }, func(err error) { + statusProber.SetNotReady(err) + s.Shutdown(err) }) } diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 54e8a63124..13833a3a12 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -2,12 +2,8 @@ package main import ( "context" - "net" "time" - "github.com/thanos-io/thanos/pkg/extflag" - "github.com/thanos-io/thanos/pkg/server" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -16,12 +12,16 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/relabel" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" + grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" + httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/tls" "gopkg.in/alecthomas/kingpin.v2" yaml "gopkg.in/yaml.v2" ) @@ -30,9 +30,8 @@ import ( func registerStore(m map[string]setupFunc, app *kingpin.Application) { cmd := app.Command(component.Store.String(), "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.") - httpBindAddr := regHTTPAddrFlag(cmd) - httpGracePeriod := regHTTPGracePeriodFlag(cmd) - grpcBindAddr, cert, key, clientCA := regGRPCFlags(cmd) + httpBindAddr, httpGracePeriod := regHTTPFlags(cmd) + grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA := regGRPCFlags(cmd) dataDir := cmd.Flag("data-dir", "Data directory in which to cache remote blocks."). Default("./data").String() @@ -81,9 +80,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { objStoreConfig, *dataDir, *grpcBindAddr, - *cert, - *key, - *clientCA, + time.Duration(*grpcGracePeriod), + *grpcCert, + *grpcKey, + *grpcClientCA, *httpBindAddr, time.Duration(*httpGracePeriod), uint64(*indexCacheSize), @@ -113,9 +113,10 @@ func runStore( objStoreConfig *extflag.PathOrContent, dataDir string, grpcBindAddr string, - cert string, - key string, - clientCA string, + grpcGracePeriod time.Duration, + grpcCert string, + grpcKey string, + grpcClientCA string, httpBindAddr string, httpGracePeriod time.Duration, indexCacheSizeBytes uint64, @@ -132,9 +133,9 @@ func runStore( ) error { // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) - srv := server.NewHTTP(logger, reg, component, statusProber, - server.WithListen(httpBindAddr), - server.WithGracePeriod(httpGracePeriod), + srv := httpserver.New(logger, reg, component, statusProber, + httpserver.WithListen(httpBindAddr), + httpserver.WithGracePeriod(httpGracePeriod), ) g.Add(srv.ListenAndServe, srv.Shutdown) @@ -225,26 +226,28 @@ func runStore( cancel() }) } + // Start query (proxy) gRPC StoreAPI. + { + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA) + if err != nil { + return errors.Wrap(err, "setup gRPC server") + } - l, err := net.Listen("tcp", grpcBindAddr) - if err != nil { - return errors.Wrap(err, "listen API address") - } + s := grpcserver.New(logger, reg, tracer, component, bs, + grpcserver.WithListen(grpcBindAddr), + grpcserver.WithGracePeriod(grpcGracePeriod), + grpcserver.WithTLSConfig(tlsCfg), + ) - opts, err := defaultGRPCTLSServerOpts(logger, cert, key, clientCA) - if err != nil { - return errors.Wrap(err, "grpc server options") + g.Add(func() error { + <-bucketStoreReady + statusProber.SetReady() + return s.ListenAndServe() + }, func(err error) { + statusProber.SetNotReady(err) + s.Shutdown(err) + }) } - s := newStoreGRPCServer(logger, reg, tracer, bs, opts) - - g.Add(func() error { - <-bucketStoreReady - level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr) - statusProber.SetReady() - return errors.Wrap(s.Serve(l), "serve gRPC") - }, func(error) { - s.Stop() - }) level.Info(logger).Log("msg", "starting store node") return nil diff --git a/docs/components/bucket.md b/docs/components/bucket.md index df6dcd284d..234a5234ac 100644 --- a/docs/components/bucket.md +++ b/docs/components/bucket.md @@ -122,7 +122,9 @@ Flags: object store configuration. See format details: https://thanos.io/storage.md/#configuration --listen="0.0.0.0:8080" HTTP host:port to listen on - --http-grace-period=5s Time to wait after an interrupt received for HTTP + --http-address="0.0.0.0:10902" + Listen host:port for HTTP endpoints. + --http-grace-period=2m Time to wait after an interrupt received for HTTP Server. --refresh=30m Refresh interval to download metadata from remote storage diff --git a/docs/components/compact.md b/docs/components/compact.md index df1056f571..4a710a6773 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -84,7 +84,7 @@ Flags: https://thanos.io/tracing.md/#configuration --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. - --http-grace-period=5s Time to wait after an interrupt received for HTTP + --http-grace-period=2m Time to wait after an interrupt received for HTTP Server. --data-dir="./data" Data directory in which to cache blocks and process compactions. diff --git a/docs/components/query.md b/docs/components/query.md index 3ebf692471..2f0a29771f 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -259,12 +259,14 @@ Flags: https://thanos.io/tracing.md/#configuration --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. - --http-grace-period=5s Time to wait after an interrupt received for + --http-grace-period=2m Time to wait after an interrupt received for HTTP Server. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components. + --grpc-grace-period=2m Time to wait after an interrupt received for + GRPC Server. --grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to disable TLS --grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to diff --git a/docs/components/rule.md b/docs/components/rule.md index a697dc02fb..55bac66749 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -168,12 +168,14 @@ Flags: https://thanos.io/tracing.md/#configuration --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. - --http-grace-period=5s Time to wait after an interrupt received for + --http-grace-period=2m Time to wait after an interrupt received for HTTP Server. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components. + --grpc-grace-period=2m Time to wait after an interrupt received for + GRPC Server. --grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to disable TLS --grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index 5546ad20f2..4dfc4282b2 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -101,12 +101,14 @@ Flags: https://thanos.io/tracing.md/#configuration --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. - --http-grace-period=5s Time to wait after an interrupt received for + --http-grace-period=2m Time to wait after an interrupt received for HTTP Server. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components. + --grpc-grace-period=2m Time to wait after an interrupt received for + GRPC Server. --grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to disable TLS --grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to diff --git a/docs/components/store.md b/docs/components/store.md index 6f11cebc9d..3b2a7f5737 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -51,12 +51,14 @@ Flags: https://thanos.io/tracing.md/#configuration --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. - --http-grace-period=5s Time to wait after an interrupt received for + --http-grace-period=2m Time to wait after an interrupt received for HTTP Server. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components. + --grpc-grace-period=2m Time to wait after an interrupt received for + GRPC Server. --grpc-server-tls-cert="" TLS Certificate for gRPC server, leave blank to disable TLS --grpc-server-tls-key="" TLS Key for the gRPC server, leave blank to diff --git a/go.mod b/go.mod index 62b62543c4..454676ed2b 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.2.1 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 - github.com/prometheus/common v0.6.0 + github.com/prometheus/common v0.7.0 github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467 // v1.8.2 is misleading as Prometheus does not have v2 module. github.com/uber-go/atomic v1.4.0 // indirect github.com/uber/jaeger-client-go v2.16.0+incompatible @@ -49,7 +49,6 @@ require ( golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582 // indirect golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 golang.org/x/sync v0.0.0-20190423024810-112230192c58 - golang.org/x/sys v0.0.0-20191010194322-b09406accb47 // indirect golang.org/x/text v0.3.2 google.golang.org/api v0.11.0 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 diff --git a/go.sum b/go.sum index f57134141b..2ec5b5a3cd 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,7 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA= github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= @@ -402,6 +403,7 @@ github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6 github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= +github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -410,6 +412,7 @@ github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNG github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= +github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8= github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467 h1:B9IMa7s163/ZDSduepHHfOZZHSKdSbgo/bFY5c+FMAs= diff --git a/pkg/server/grpc/grpc.go b/pkg/server/grpc/grpc.go new file mode 100644 index 0000000000..29774485eb --- /dev/null +++ b/pkg/server/grpc/grpc.go @@ -0,0 +1,132 @@ +package grpc + +import ( + "context" + "math" + "net" + "runtime/debug" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tracing" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" +) + +// A Server defines parameters to serve RPC requests, a wrapper around grpc.Server. +type Server struct { + logger log.Logger + comp component.Component + + srv *grpc.Server + listener net.Listener + + opts options +} + +// New creates a new Server. +func New(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, comp component.Component, storeSrv storepb.StoreServer, opts ...Option) *Server { + options := options{} + for _, o := range opts { + o.apply(&options) + } + + met := grpc_prometheus.NewServerMetrics() + met.EnableHandlingTimeHistogram( + grpc_prometheus.WithHistogramBuckets([]float64{ + 0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4, + }), + ) + panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_grpc_req_panics_recovered_total", + Help: "Total number of gRPC requests recovered from internal panic.", + }) + reg.MustRegister(met, panicsTotal) + + grpcPanicRecoveryHandler := func(p interface{}) (err error) { + panicsTotal.Inc() + level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack()) + return status.Errorf(codes.Internal, "%s", p) + } + + grpcOpts := []grpc.ServerOption{} + grpcOpts = append(grpcOpts, + grpc.MaxSendMsgSize(math.MaxInt32), + grpc_middleware.WithUnaryServerChain( + met.UnaryServerInterceptor(), + tracing.UnaryServerInterceptor(tracer), + grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), + ), + grpc_middleware.WithStreamServerChain( + met.StreamServerInterceptor(), + tracing.StreamServerInterceptor(tracer), + grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), + ), + ) + + if options.tlsConfig != nil { + grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(options.tlsConfig))) + } + s := grpc.NewServer(grpcOpts...) + + storepb.RegisterStoreServer(s, storeSrv) + met.InitializeMetrics(s) + + return &Server{ + logger: log.With(logger, "service", "gRPC/server", "component", comp.String()), + comp: comp, + srv: s, + opts: options, + } +} + +// ListenAndServe listens on the TCP network address and handles requests on incoming connections. +func (s *Server) ListenAndServe() error { + l, err := net.Listen("tcp", s.opts.listen) + if err != nil { + return errors.Wrapf(err, "listen gRPC on address %s", s.opts.listen) + } + s.listener = l + + level.Info(s.logger).Log("msg", "listening for StoreAPI gRPC", "address", s.opts.listen) + return errors.Wrap(s.srv.Serve(s.listener), "serve gRPC") +} + +// Shutdown gracefully shuts down the server by waiting, +// for specified amount of time (by gracePeriod) for connections to return to idle and then shut down. +func (s *Server) Shutdown(err error) { + defer level.Info(s.logger).Log("msg", "internal server shutdown", "err", err) + + if s.opts.gracePeriod == 0 { + s.srv.Stop() + return + } + + ctx, cancel := context.WithTimeout(context.Background(), s.opts.gracePeriod) + defer cancel() + + stopped := make(chan struct{}) + go func() { + level.Info(s.logger).Log("msg", "gracefully stopping internal server") + s.srv.GracefulStop() // Also closes s.listener. + close(stopped) + }() + + select { + case <-ctx.Done(): + level.Info(s.logger).Log("msg", "grace period exceeded enforcing shutdown") + s.srv.Stop() + case <-stopped: + cancel() + } +} diff --git a/pkg/server/grpc/option.go b/pkg/server/grpc/option.go new file mode 100644 index 0000000000..6c804e4f79 --- /dev/null +++ b/pkg/server/grpc/option.go @@ -0,0 +1,47 @@ +package grpc + +import ( + "crypto/tls" + "time" +) + +type options struct { + gracePeriod time.Duration + listen string + + tlsConfig *tls.Config +} + +// Option overrides behavior of Server. +type Option interface { + apply(*options) +} + +type optionFunc func(*options) + +func (f optionFunc) apply(o *options) { + f(o) +} + +// WithGracePeriod sets shutdown grace period for gRPC server. +// Server waits connections to drain for specified amount of time. +func WithGracePeriod(t time.Duration) Option { + return optionFunc(func(o *options) { + o.gracePeriod = t + }) +} + +// WithListen sets address to listen for gRPC server. +// Server accepts incoming connections on given address. +func WithListen(s string) Option { + return optionFunc(func(o *options) { + o.listen = s + }) +} + +// WithTLSConfig sets TLS configuration for gRPC server. +func WithTLSConfig(cfg *tls.Config) Option { + return optionFunc(func(o *options) { + o.tlsConfig = cfg + }) +} diff --git a/pkg/server/http.go b/pkg/server/http/http.go similarity index 64% rename from pkg/server/http.go rename to pkg/server/http/http.go index d7b17a5a77..290d97c59a 100644 --- a/pkg/server/http.go +++ b/pkg/server/http/http.go @@ -1,21 +1,20 @@ -package server +package http import ( "context" "net/http" "net/http/pprof" - "time" - - "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/prober" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/prober" ) +// A Server defines parameters for serve HTTP requests, a wrapper around http.Server. type Server struct { logger log.Logger comp component.Component @@ -27,12 +26,9 @@ type Server struct { opts options } -func NewHTTP(logger log.Logger, reg *prometheus.Registry, comp component.Component, prober *prober.Prober, opts ...Option) Server { - options := options{ - gracePeriod: 5 * time.Second, - listen: "0.0.0.0:10902", - } - +// New creates a new Server. +func New(logger log.Logger, reg *prometheus.Registry, comp component.Component, prober *prober.Prober, opts ...Option) *Server { + options := options{} for _, o := range opts { o.apply(&options) } @@ -42,8 +38,8 @@ func NewHTTP(logger log.Logger, reg *prometheus.Registry, comp component.Compone registerProfiler(mux) prober.RegisterInMux(mux) - return Server{ - logger: log.With(logger, "service", "http/server"), + return &Server{ + logger: log.With(logger, "service", "http/server", "component", comp.String()), comp: comp, prober: prober, mux: mux, @@ -52,12 +48,15 @@ func NewHTTP(logger log.Logger, reg *prometheus.Registry, comp component.Compone } } +// ListenAndServe listens on the TCP network address and handles requests on incoming connections. func (s *Server) ListenAndServe() error { s.prober.SetHealthy() - level.Info(s.logger).Log("msg", "listening for requests and metrics", "component", s.comp.String(), "address", s.opts.listen) - return errors.Wrapf(s.srv.ListenAndServe(), "serve %s and metrics", s.comp.String()) + level.Info(s.logger).Log("msg", "listening for requests and metrics", "address", s.opts.listen) + return errors.Wrap(s.srv.ListenAndServe(), "serve HTTP and metrics") } +// Shutdown gracefully shuts down the server by waiting, +// for specified amount of time (by gracePeriod) for connections to return to idle and then shut down. func (s *Server) Shutdown(err error) { s.prober.SetNotReady(err) defer s.prober.SetNotHealthy(err) @@ -67,16 +66,22 @@ func (s *Server) Shutdown(err error) { return } + defer level.Info(s.logger).Log("msg", "internal server shutdown", "err", err) + + if s.opts.gracePeriod == 0 { + s.srv.Close() + return + } + ctx, cancel := context.WithTimeout(context.Background(), s.opts.gracePeriod) defer cancel() - level.Info(s.logger).Log("msg", "server shut down internal server") - if err := s.srv.Shutdown(ctx); err != nil { - level.Error(s.logger).Log("msg", "server shut down failed", "err", err, "component", s.comp.String()) + level.Error(s.logger).Log("msg", "internal server shut down failed", "err", err) } } +// Handle registers the handler for the given pattern. func (s *Server) Handle(pattern string, handler http.Handler) { s.mux.Handle(pattern, handler) } diff --git a/pkg/server/option.go b/pkg/server/http/option.go similarity index 64% rename from pkg/server/option.go rename to pkg/server/http/option.go index 702b6dbecc..ab7ff59e0c 100644 --- a/pkg/server/option.go +++ b/pkg/server/http/option.go @@ -1,4 +1,4 @@ -package server +package http import ( "time" @@ -20,12 +20,16 @@ func (f optionFunc) apply(o *options) { f(o) } +// WithGracePeriod sets shutdown grace period for HTTP server. +// Server waits connections to drain for specified amount of time. func WithGracePeriod(t time.Duration) Option { return optionFunc(func(o *options) { o.gracePeriod = t }) } +// WithListen sets address to listen for HTTP server. +// Server accepts incoming TCP connections on given address. func WithListen(s string) Option { return optionFunc(func(o *options) { o.listen = s diff --git a/pkg/tls/options.go b/pkg/tls/options.go new file mode 100644 index 0000000000..58b6c56d36 --- /dev/null +++ b/pkg/tls/options.go @@ -0,0 +1,104 @@ +package tls + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" +) + +// NewServerConfig provides new server TLS configuration. +func NewServerConfig(logger log.Logger, cert, key, clientCA string) (*tls.Config, error) { + if key == "" && cert == "" { + if clientCA != "" { + return nil, errors.New("when a client CA is used a server key and certificate must also be provided") + } + + level.Info(logger).Log("msg", "disabled TLS, key and cert must be set to enable") + return nil, nil + } + + level.Info(logger).Log("msg", "enabling server side TLS") + + if key == "" || cert == "" { + return nil, errors.New("both server key and certificate must be provided") + } + + tlsCfg := &tls.Config{ + MinVersion: tls.VersionTLS12, + } + + tlsCert, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return nil, errors.Wrap(err, "server credentials") + } + + tlsCfg.Certificates = []tls.Certificate{tlsCert} + + if clientCA != "" { + caPEM, err := ioutil.ReadFile(clientCA) + if err != nil { + return nil, errors.Wrap(err, "reading client CA") + } + + certPool := x509.NewCertPool() + if !certPool.AppendCertsFromPEM(caPEM) { + return nil, errors.Wrap(err, "building client CA") + } + tlsCfg.ClientCAs = certPool + tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert + + level.Info(logger).Log("msg", "server TLS client verification enabled") + } + + return tlsCfg, nil +} + +// NewClientConfig provides new client TLS configuration. +func NewClientConfig(logger log.Logger, cert, key, caCert, serverName string) (*tls.Config, error) { + var certPool *x509.CertPool + if caCert != "" { + caPEM, err := ioutil.ReadFile(caCert) + if err != nil { + return nil, errors.Wrap(err, "reading client CA") + } + + certPool = x509.NewCertPool() + if !certPool.AppendCertsFromPEM(caPEM) { + return nil, errors.Wrap(err, "building client CA") + } + level.Info(logger).Log("msg", "TLS client using provided certificate pool") + } else { + var err error + certPool, err = x509.SystemCertPool() + if err != nil { + return nil, errors.Wrap(err, "reading system certificate pool") + } + level.Info(logger).Log("msg", "TLS client using system certificate pool") + } + + tlsCfg := &tls.Config{ + RootCAs: certPool, + } + + if serverName != "" { + tlsCfg.ServerName = serverName + } + + if (key != "") != (cert != "") { + return nil, errors.New("both client key and certificate must be provided") + } + + if cert != "" { + cert, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return nil, errors.Wrap(err, "client credentials") + } + tlsCfg.Certificates = []tls.Certificate{cert} + level.Info(logger).Log("msg", "TLS client authentication enabled") + } + return tlsCfg, nil +} diff --git a/pkg/ui/bucket.go b/pkg/ui/bucket.go index 84dfdbfdb5..f1fdd8ee47 100644 --- a/pkg/ui/bucket.go +++ b/pkg/ui/bucket.go @@ -7,7 +7,7 @@ import ( "github.com/go-kit/kit/log" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" - "github.com/thanos-io/thanos/pkg/server" + httpserver "github.com/thanos-io/thanos/pkg/server/http" ) // Bucket is a web UI representing state of buckets as a timeline. @@ -30,7 +30,7 @@ func NewBucketUI(logger log.Logger, label string) *Bucket { } // Register registers http routes for bucket UI. -func (b *Bucket) Register(s server.Server, ins extpromhttp.InstrumentationMiddleware) { +func (b *Bucket) Register(s *httpserver.Server, ins extpromhttp.InstrumentationMiddleware) { instrf := func(name string, next func(w http.ResponseWriter, r *http.Request)) http.HandlerFunc { return ins.NewHandler(name, http.HandlerFunc(next)) } diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index faff7bcc76..ccdee968b6 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -23,8 +23,7 @@ fi # Start local object storage, if desired. # NOTE: If you would like to use an actual S3-compatible API with this setup # set the S3_* environment variables set in the Minio example. -if [ -n "${MINIO_ENABLED}" ] -then +if [ -n "${MINIO_ENABLED}" ]; then if [ ! $(command -v $MINIO_EXECUTABLE) ]; then echo "Cannot find or execute Minio binary $MINIO_EXECUTABLE, you can override it by setting the MINIO_EXECUTABLE env variable" exit 1 @@ -49,14 +48,14 @@ then mkdir -p data/minio ${MINIO_EXECUTABLE} server ./data/minio \ - --address ${MINIO_ENDPOINT} & + --address ${MINIO_ENDPOINT} & sleep 3 # create the bucket ${MC_EXECUTABLE} config host add tmp http://${MINIO_ENDPOINT} ${MINIO_ACCESS_KEY} ${MINIO_SECRET_KEY} ${MC_EXECUTABLE} mb tmp/${MINIO_BUCKET} ${MC_EXECUTABLE} config host rm tmp - cat < data/bucket.yml + cat <data/bucket.yml type: S3 config: bucket: $S3_BUCKET @@ -71,12 +70,11 @@ fi STORES="" # Start three Prometheus servers monitoring themselves. -for i in `seq 0 2` -do +for i in $(seq 0 2); do rm -rf data/prom${i} mkdir -p data/prom${i}/ - cat > data/prom${i}/prometheus.yml <<- EOF + cat >data/prom${i}/prometheus.yml <<-EOF global: external_labels: prometheus: prom-${i} @@ -110,13 +108,13 @@ scrape_configs: EOF ${PROMETHEUS_EXECUTABLE} \ - --config.file data/prom${i}/prometheus.yml \ - --storage.tsdb.path data/prom${i} \ - --log.level warn \ + --config.file data/prom${i}/prometheus.yml \ + --storage.tsdb.path data/prom${i} \ + --log.level warn \ --web.enable-lifecycle \ --storage.tsdb.min-block-duration=2h \ --storage.tsdb.max-block-duration=2h \ - --web.listen-address 0.0.0.0:909${i} & + --web.listen-address 0.0.0.0:909${i} & sleep 0.25 done @@ -124,20 +122,21 @@ done sleep 0.5 OBJSTORECFG="" -if [ -n "${MINIO_ENABLED}" ] -then -OBJSTORECFG="--objstore.config-file data/bucket.yml" +if [ -n "${MINIO_ENABLED}" ]; then + OBJSTORECFG="--objstore.config-file data/bucket.yml" fi # Start one sidecar for each Prometheus server. -for i in `seq 0 2` -do +for i in $(seq 0 2); do ${THANOS_EXECUTABLE} sidecar \ - --debug.name sidecar-${i} \ - --grpc-address 0.0.0.0:109${i}1 \ - --http-address 0.0.0.0:109${i}2 \ - --prometheus.url http://localhost:909${i} \ - --tsdb.path data/prom${i} \ + --debug.name sidecar-${i} \ + --log.level debug \ + --grpc-address 0.0.0.0:109${i}1 \ + --grpc-grace-period 1s \ + --http-address 0.0.0.0:109${i}2 \ + --http-grace-period 1s \ + --prometheus.url http://localhost:909${i} \ + --tsdb.path data/prom${i} \ ${OBJSTORECFG} & STORES="${STORES} --store 127.0.0.1:109${i}1" @@ -147,14 +146,15 @@ done sleep 0.5 -if [ -n "${GCS_BUCKET}" -o -n "${S3_ENDPOINT}" ] -then +if [ -n "${GCS_BUCKET}" -o -n "${S3_ENDPOINT}" ]; then ${THANOS_EXECUTABLE} store \ - --debug.name store \ - --log.level debug \ - --grpc-address 0.0.0.0:10905 \ - --http-address 0.0.0.0:10906 \ - --data-dir data/store \ + --debug.name store \ + --log.level debug \ + --grpc-address 0.0.0.0:10905 \ + --grpc-grace-period 1s \ + --http-address 0.0.0.0:10906 \ + --http-grace-period 1s \ + --data-dir data/store \ ${OBJSTORECFG} & STORES="${STORES} --store 127.0.0.1:10905" @@ -162,20 +162,22 @@ fi sleep 0.5 -if [ -n "${REMOTE_WRITE_ENABLED}" ] -then +if [ -n "${REMOTE_WRITE_ENABLED}" ]; then ${THANOS_EXECUTABLE} receive \ - --debug.name receive \ - --log.level debug \ - --tsdb.path "./data/remote-write-receive-data" \ - --grpc-address 0.0.0.0:10907 \ - --http-address 0.0.0.0:10909 \ - --label "receive=\"true\"" \ + --debug.name receive \ + --log.level debug \ + --log.level debug \ + --tsdb.path "./data/remote-write-receive-data" \ + --grpc-address 0.0.0.0:10907 \ + --grpc-grace-period 1s \ + --http-address 0.0.0.0:10909 \ + --http-grace-period 1s \ + --label "receive=\"true\"" \ ${OBJSTORECFG} \ - --remote-write.address 0.0.0.0:10908 & + --remote-write.address 0.0.0.0:10908 & mkdir -p "data/local-prometheus-data/" - cat < data/local-prometheus-data/prometheus.yml + cat <data/local-prometheus-data/prometheus.yml # When the Thanos remote-write-receive component is started, # this is an example configuration of a Prometheus server that # would scrape a local node-exporter and replicate its data to @@ -198,15 +200,16 @@ fi sleep 0.5 # Start two query nodes. -for i in `seq 0 1` -do +for i in $(seq 0 1); do ${THANOS_EXECUTABLE} query \ - --debug.name query-${i} \ - --grpc-address 0.0.0.0:109${i}3 \ - --http-address 0.0.0.0:109${i}4 \ - --query.replica-label prometheus \ + --debug.name query-${i} \ + --log.level debug \ + --grpc-address 0.0.0.0:109${i}3 \ + --grpc-grace-period 1s \ + --http-address 0.0.0.0:109${i}4 \ + --http-grace-period 1s \ + --query.replica-label prometheus \ ${STORES} & done wait - diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 65c636771d..9c6b3b09ad 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -156,6 +156,7 @@ func sidecar(http, grpc address, prom *prometheusScheduler) *serverScheduler { return newCmdExec(exec.Command("thanos", "sidecar", "--debug.name", fmt.Sprintf("sidecar-%s", http.Port), "--grpc-address", grpc.HostPort(), + "--grpc-grace-period", "0s", "--http-address", http.HostPort(), "--prometheus.url", prom.HTTP.URL(), "--tsdb.path", promDir, @@ -190,6 +191,7 @@ func receiver(http, grpc, metric address, replicationFactor int, hashring ...rec return newCmdExec(exec.Command("thanos", "receive", "--debug.name", fmt.Sprintf("receive-%s", http.Port), "--grpc-address", grpc.HostPort(), + "--grpc-grace-period", "0s", "--http-address", metric.HostPort(), "--remote-write.address", http.HostPort(), "--label", fmt.Sprintf(`receive="%s"`, http.Port), @@ -213,6 +215,7 @@ func querier(http, grpc address, storeAddresses []address, fileSDStoreAddresses "query", "--debug.name", fmt.Sprintf("querier-%s", http.Port), "--grpc-address", grpc.HostPort(), + "--grpc-grace-period", "0s", "--http-address", http.HostPort(), "--log.level", "debug", "--query.replica-label", replicaLabel, @@ -259,6 +262,7 @@ func storeGateway(http, grpc address, bucketConfig []byte, relabelConfig []byte) "--debug.name", fmt.Sprintf("store-gw-%s", http.Port), "--data-dir", dbDir, "--grpc-address", grpc.HostPort(), + "--grpc-grace-period", "0s", "--http-address", http.HostPort(), "--log.level", "debug", "--objstore.config", string(bucketConfig), @@ -332,6 +336,7 @@ func ruleWithDir(http, grpc address, dir string, rules []string, am address, que "--eval-interval", "1s", "--alertmanagers.url", am.URL(), "--grpc-address", grpc.HostPort(), + "--grpc-grace-period", "0s", "--http-address", http.HostPort(), "--log.level", "debug", "--query.sd-dns-interval", "5s",