diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index eaa8b0e8509..b44a2c9fc09 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/common/version" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/tracing/client" "go.uber.org/automaxprocs/maxprocs" @@ -244,41 +245,8 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) { mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{})) } -// defaultGRPCServerOpts returns default gRPC server opts that includes: -// - request histogram -// - tracing -// - panic recovery with panic counter -func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, met *grpc_prometheus.ServerMetrics, cert, key, clientCA string) ([]grpc.ServerOption, error) { - met.EnableHandlingTimeHistogram( - grpc_prometheus.WithHistogramBuckets([]float64{ - 0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4, - }), - ) - - panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_grpc_req_panics_recovered_total", - Help: "Total number of gRPC requests recovered from internal panic.", - }) - grpcPanicRecoveryHandler := func(p interface{}) (err error) { - panicsTotal.Inc() - level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack()) - return status.Errorf(codes.Internal, "%s", p) - } - - reg.MustRegister(met, panicsTotal) - opts := []grpc.ServerOption{ - grpc.MaxSendMsgSize(math.MaxInt32), - grpc_middleware.WithUnaryServerChain( - met.UnaryServerInterceptor(), - tracing.UnaryServerInterceptor(tracer), - grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), - ), - grpc_middleware.WithStreamServerChain( - met.StreamServerInterceptor(), - tracing.StreamServerInterceptor(tracer), - grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), - ), - } +func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) { + opts := []grpc.ServerOption{} if key == "" && cert == "" { if clientCA != "" { @@ -325,6 +293,45 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil } +func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server { + met := grpc_prometheus.NewServerMetrics() + met.EnableHandlingTimeHistogram( + grpc_prometheus.WithHistogramBuckets([]float64{ + 0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4, + }), + ) + panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_grpc_req_panics_recovered_total", + Help: "Total number of gRPC requests recovered from internal panic.", + }) + reg.MustRegister(met, panicsTotal) + + grpcPanicRecoveryHandler := func(p interface{}) (err error) { + panicsTotal.Inc() + level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack()) + return status.Errorf(codes.Internal, "%s", p) + } + opts = append(opts, + grpc.MaxSendMsgSize(math.MaxInt32), + grpc_middleware.WithUnaryServerChain( + met.UnaryServerInterceptor(), + tracing.UnaryServerInterceptor(tracer), + grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), + ), + grpc_middleware.WithStreamServerChain( + met.StreamServerInterceptor(), + tracing.StreamServerInterceptor(tracer), + grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)), + ), + ) + + s := grpc.NewServer(opts...) + storepb.RegisterStoreServer(s, srv) + met.InitializeMetrics(s) + + return s +} + // TODO Remove once all components are migrated to the new defaultHTTPListener. // metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics. func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error { diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 4448f762552..0eb2ac03995 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -34,7 +34,6 @@ import ( v1 "github.com/thanos-io/thanos/pkg/query/api" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store" - "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/ui" "google.golang.org/grpc" @@ -441,15 +440,11 @@ func runQuery( } logger := log.With(logger, "component", component.Query.String()) - met := grpc_prometheus.NewServerMetrics() - opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, srvCert, srvKey, srvClientCA) + opts, err := defaultGRPCServerOpts(logger, srvCert, srvKey, srvClientCA) if err != nil { return errors.Wrapf(err, "build gRPC server") } - - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, proxy) - met.InitializeMetrics(s) + s := newStoreGRPCServer(logger, reg, tracer, proxy, opts) g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index f6d96474f91..85cd5a05326 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -25,7 +24,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" - "github.com/thanos-io/thanos/pkg/store/storepb" "google.golang.org/grpc" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -268,14 +266,11 @@ func runReceive( db := localStorage.Get() tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset) - met := grpc_prometheus.NewServerMetrics() - opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) + opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } - s = grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, tsdbStore) - met.InitializeMetrics(s) + s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts) level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr) return errors.Wrap(s.Serve(l), "serve gRPC") diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 8e2284776d1..43ce9a0390a 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -19,7 +19,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -51,7 +50,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/ui" - "google.golang.org/grpc" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -494,14 +492,11 @@ func runRule( store := store.NewTSDBStore(logger, reg, db, component.Rule, lset) - met := grpc_prometheus.NewServerMetrics() - opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) + opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA) if err != nil { return errors.Wrap(err, "setup gRPC options") } - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, store) - met.InitializeMetrics(s) + s := newStoreGRPCServer(logger, reg, tracer, store, opts) g.Add(func() error { return errors.Wrap(s.Serve(l), "serve gRPC") diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index fdb125687fb..2a3c5afe9e3 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -27,7 +26,6 @@ import ( "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" - "google.golang.org/grpc" "gopkg.in/alecthomas/kingpin.v2" ) @@ -221,14 +219,11 @@ func runSidecar( return errors.Wrap(err, "create Prometheus store") } - met := grpc_prometheus.NewServerMetrics() - opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) + opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, promStore) - met.InitializeMetrics(s) + s := newStoreGRPCServer(logger, reg, tracer, promStore, opts) g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index afa29fdfe38..c3f7ccb8710 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -7,7 +7,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/oklog/run" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -17,8 +16,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store" storecache "github.com/thanos-io/thanos/pkg/store/cache" - "github.com/thanos-io/thanos/pkg/store/storepb" - "google.golang.org/grpc" kingpin "gopkg.in/alecthomas/kingpin.v2" ) @@ -188,15 +185,11 @@ func runStore( return errors.Wrap(err, "listen API address") } - met := grpc_prometheus.NewServerMetrics() - opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA) + opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA) if err != nil { return errors.Wrap(err, "grpc server options") } - - s := grpc.NewServer(opts...) - storepb.RegisterStoreServer(s, bs) - met.InitializeMetrics(s) + s := newStoreGRPCServer(logger, reg, tracer, bs, opts) g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)