From 9a2923b9f0cbfef94a9c760adf86711a70fc5660 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Thu, 29 Aug 2019 16:54:36 +0200 Subject: [PATCH 1/3] Initialize grpc server metrics Signed-off-by: Kemal Akkoyun --- cmd/thanos/main.go | 4 ++-- cmd/thanos/query.go | 4 +++- cmd/thanos/receive.go | 5 ++++- cmd/thanos/rule.go | 5 ++++- cmd/thanos/sidecar.go | 5 ++++- cmd/thanos/store.go | 5 ++++- 6 files changed, 21 insertions(+), 7 deletions(-) diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index 2e6f2bf413..eaa8b0e850 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -248,8 +248,7 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) { // - request histogram // - tracing // - panic recovery with panic counter -func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, cert, key, clientCA string) ([]grpc.ServerOption, error) { - met := grpc_prometheus.NewServerMetrics() +func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, met *grpc_prometheus.ServerMetrics, cert, key, clientCA string) ([]grpc.ServerOption, error) { met.EnableHandlingTimeHistogram( grpc_prometheus.WithHistogramBuckets([]float64{ 0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4, @@ -265,6 +264,7 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack()) return status.Errorf(codes.Internal, "%s", p) } + reg.MustRegister(met, panicsTotal) opts := []grpc.ServerOption{ grpc.MaxSendMsgSize(math.MaxInt32), diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 4577ac0b3a..4448f76255 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -441,13 +441,15 @@ func runQuery( } logger := log.With(logger, "component", component.Query.String()) - opts, err := defaultGRPCServerOpts(logger, reg, tracer, srvCert, srvKey, srvClientCA) + met := grpc_prometheus.NewServerMetrics() + opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, srvCert, srvKey, srvClientCA) if err != nil { return errors.Wrapf(err, "build gRPC server") } s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, proxy) + met.InitializeMetrics(s) g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 78c8c1bd1b..f6d96474f9 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -267,12 +268,14 @@ func runReceive( db := localStorage.Get() tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset) - opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) + met := grpc_prometheus.NewServerMetrics() + opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } s = grpc.NewServer(opts...) storepb.RegisterStoreServer(s, tsdbStore) + met.InitializeMetrics(s) level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr) return errors.Wrap(s.Serve(l), "serve gRPC") diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 6226ca4479..8e2284776d 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -19,6 +19,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -493,12 +494,14 @@ func runRule( store := store.NewTSDBStore(logger, reg, db, component.Rule, lset) - opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) + met := grpc_prometheus.NewServerMetrics() + opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) if err != nil { return errors.Wrap(err, "setup gRPC options") } s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, store) + met.InitializeMetrics(s) g.Add(func() error { return errors.Wrap(s.Serve(l), "serve gRPC") diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 3e4655c5d1..fdb125687f 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -220,12 +221,14 @@ func runSidecar( return errors.Wrap(err, "create Prometheus store") } - opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) + met := grpc_prometheus.NewServerMetrics() + opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, promStore) + met.InitializeMetrics(s) g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 23a97b344a..afa29fdfe3 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -187,13 +188,15 @@ func runStore( return errors.Wrap(err, "listen API address") } - opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA) + met := grpc_prometheus.NewServerMetrics() + opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) if err != nil { return errors.Wrap(err, "grpc server options") } s := grpc.NewServer(opts...) storepb.RegisterStoreServer(s, bs) + met.InitializeMetrics(s) g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) From b039fc501064ffac6166cc5b5b068135731e3b8e Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Thu, 29 Aug 2019 16:59:16 +0200 Subject: [PATCH 2/3] Ad change log Signed-off-by: Kemal Akkoyun --- CHANGELOG.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a76eceb96a..711fd581c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,9 +20,10 @@ Accepted into CNCF: ### Added +- [#1478](https://github.com/thanos-io/thanos/pull/1478) Thanos components now exposes gRPC server metrics as soon as server starts, to provide more reliable data for instrumentation. - [#1378](https://github.com/thanos-io/thanos/pull/1378) Thanos Receive now exposes `thanos_receive_config_hash`, `thanos_receive_config_last_reload_successful` and `thanos_receive_config_last_reload_success_timestamp_seconds` metrics to track latest configuration change -- [#1268](https://github.com/thanos-io/thanos/pull/1268) Thanos Sidecar added support for newest Prometheus streaming remote read added [here](https://github.com/prometheus/prometheus/pull/5703). This massively improves memory required by single - request for both Prometheus and sidecar. Single requests now should take constant amount of memory on sidecar, so resource consumption prediction is now straightforward. This will be used if you have Prometheus `2.13` or `2.12-master`. +- [#1268](https://github.com/thanos-io/thanos/pull/1268) Thanos Sidecar added support for newest Prometheus streaming remote read added [here](https://github.com/prometheus/prometheus/pull/5703). This massively improves memory required by single + request for both Prometheus and sidecar. Single requests now should take constant amount of memory on sidecar, so resource consumption prediction is now straightforward. This will be used if you have Prometheus `2.13` or `2.12-master`. - [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type - [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings - [#1395](https://github.com/thanos-io/thanos/pull/1395) Thanos Sidecar added `/-/ready` and `/-/healthy` endpoints to Thanos sidecar. @@ -39,7 +40,7 @@ Accepted into CNCF: - [BUGFIX] prometheus_tsdb_compactions_failed_total is now incremented on any compaction failure. tsdb#613 - [BUGFIX] PromQL: Correctly display {__name__="a"}. - [#1338](https://github.com/thanos-io/thanos/pull/1338) Thanos Query still warns on store API duplicate, but allows a single one from duplicated set. This is gracefully warn about the problematic logic and not disrupt immediately. -- [#1385](https://github.com/thanos-io/thanos/pull/1385) Thanos Compact exposes flag to disable downsampling `downsampling.disable`. +- [#1385](https://github.com/thanos-io/thanos/pull/1385) Thanos Compact exposes flag to disable downsampling `downsampling.disable`. ### Fixed From 1bf2966bcfcd651b435520ae5ae50c685215be40 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Mon, 2 Sep 2019 11:41:48 +0200 Subject: [PATCH 3/3] Refactor. Separate server initialization from server options Signed-off-by: Kemal Akkoyun --- cmd/thanos/main.go | 77 +++++++++++++++++++++++-------------------- cmd/thanos/query.go | 9 ++--- cmd/thanos/receive.go | 9 ++--- cmd/thanos/rule.go | 9 ++--- cmd/thanos/sidecar.go | 9 ++--- cmd/thanos/store.go | 11 ++----- 6 files changed, 52 insertions(+), 72 deletions(-) diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index eaa8b0e850..b44a2c9fc0 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/common/version" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/tracing/client" "go.uber.org/automaxprocs/maxprocs" @@ -244,41 +245,8 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) { mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{})) } -// defaultGRPCServerOpts returns default gRPC server opts that includes: -// - request histogram -// - tracing -// - panic recovery with panic counter -func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, met *grpc_prometheus.ServerMetrics, cert, key, clientCA string) ([]grpc.ServerOption, error) { - met.EnableHandlingTimeHistogram( - grpc_prometheus.WithHistogramBuckets([]float64{ - 0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4, - }), - ) - - panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_grpc_req_panics_recovered_total", - Help: "Total number of gRPC requests recovered from internal panic.", - }) - grpcPanicRecoveryHandler := func(p interface{}) (err error) { - panicsTotal.Inc() - level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack()) - return status.Errorf(codes.Internal, "%s", p) - } - - reg.MustRegister(met, panicsTotal) - opts := []grpc.ServerOption{ - grpc.MaxSendMsgSize(math.MaxInt32), - grpc_middleware.WithUnaryServerChain( - met.UnaryServerInterceptor(), - tracing.UnaryServerInterceptor(tracer), - grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), - ), - grpc_middleware.WithStreamServerChain( - met.StreamServerInterceptor(), - tracing.StreamServerInterceptor(tracer), - grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), - ), - } +func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) { + opts := []grpc.ServerOption{} if key == "" && cert == "" { if clientCA != "" { @@ -325,6 +293,45 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil } +func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server { + met := grpc_prometheus.NewServerMetrics() + met.EnableHandlingTimeHistogram( + grpc_prometheus.WithHistogramBuckets([]float64{ + 0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4, + }), + ) + panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_grpc_req_panics_recovered_total", + Help: "Total number of gRPC requests recovered from internal panic.", + }) + reg.MustRegister(met, panicsTotal) + + grpcPanicRecoveryHandler := func(p interface{}) (err error) { + panicsTotal.Inc() + level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack()) + return status.Errorf(codes.Internal, "%s", p) + } + opts = append(opts, + grpc.MaxSendMsgSize(math.MaxInt32), + grpc_middleware.WithUnaryServerChain( + met.UnaryServerInterceptor(), + tracing.UnaryServerInterceptor(tracer), + grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), + ), + grpc_middleware.WithStreamServerChain( + met.StreamServerInterceptor(), + tracing.StreamServerInterceptor(tracer), + grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), + ), + ) + + s := grpc.NewServer(opts...) + storepb.RegisterStoreServer(s, srv) + met.InitializeMetrics(s) + + return s +} + // TODO Remove once all components are migrated to the new defaultHTTPListener. // metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics. func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error { diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 4448f76255..0eb2ac0399 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -34,7 +34,6 @@ import ( v1 "github.com/thanos-io/thanos/pkg/query/api" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store" - "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/ui" "google.golang.org/grpc" @@ -441,15 +440,11 @@ func runQuery( } logger := log.With(logger, "component", component.Query.String()) - met := grpc_prometheus.NewServerMetrics() - opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, srvCert, srvKey, srvClientCA) + opts, err := defaultGRPCServerOpts(logger, srvCert, srvKey, srvClientCA) if err != nil { return errors.Wrapf(err, "build gRPC server") } - - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, proxy) - met.InitializeMetrics(s) + s := newStoreGRPCServer(logger, reg, tracer, proxy, opts) g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index f6d96474f9..85cd5a0532 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -25,7 +24,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" - "github.com/thanos-io/thanos/pkg/store/storepb" "google.golang.org/grpc" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -268,14 +266,11 @@ func runReceive( db := localStorage.Get() tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset) - met := grpc_prometheus.NewServerMetrics() - opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) + opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } - s = grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, tsdbStore) - met.InitializeMetrics(s) + s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts) level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr) return errors.Wrap(s.Serve(l), "serve gRPC") diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 8e2284776d..43ce9a0390 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -19,7 +19,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -51,7 +50,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/ui" - "google.golang.org/grpc" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -494,14 +492,11 @@ func runRule( store := store.NewTSDBStore(logger, reg, db, component.Rule, lset) - met := grpc_prometheus.NewServerMetrics() - opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) + opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA) if err != nil { return errors.Wrap(err, "setup gRPC options") } - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, store) - met.InitializeMetrics(s) + s := newStoreGRPCServer(logger, reg, tracer, store, opts) g.Add(func() error { return errors.Wrap(s.Serve(l), "serve gRPC") diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index fdb125687f..2a3c5afe9e 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -27,7 +26,6 @@ import ( "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" - "google.golang.org/grpc" "gopkg.in/alecthomas/kingpin.v2" ) @@ -221,14 +219,11 @@ func runSidecar( return errors.Wrap(err, "create Prometheus store") } - met := grpc_prometheus.NewServerMetrics() - opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) + opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, promStore) - met.InitializeMetrics(s) + s := newStoreGRPCServer(logger, reg, tracer, promStore, opts) g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index afa29fdfe3..c3f7ccb871 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -7,7 +7,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -17,8 +16,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store" storecache "github.com/thanos-io/thanos/pkg/store/cache" - "github.com/thanos-io/thanos/pkg/store/storepb" - "google.golang.org/grpc" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -188,15 +185,11 @@ func runStore( return errors.Wrap(err, "listen API address") } - met := grpc_prometheus.NewServerMetrics() - opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) + opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA) if err != nil { return errors.Wrap(err, "grpc server options") } - - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, bs) - met.InitializeMetrics(s) + s := newStoreGRPCServer(logger, reg, tracer, bs, opts) g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)