Skip to content

Commit

Permalink
Refactor. Separate server initialization from server options
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Sep 2, 2019
1 parent f1109b4 commit 1636ff3
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 72 deletions.
77 changes: 42 additions & 35 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/tracing/client"
"go.uber.org/automaxprocs/maxprocs"
Expand Down Expand Up @@ -244,41 +245,8 @@ func registerMetrics(mux *http.ServeMux, g prometheus.Gatherer) {
mux.Handle("/metrics", promhttp.HandlerFor(g, promhttp.HandlerOpts{}))
}

// defaultGRPCServerOpts returns default gRPC server opts that includes:
// - request histogram
// - tracing
// - panic recovery with panic counter
func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, met *grpc_prometheus.ServerMetrics, cert, key, clientCA string) ([]grpc.ServerOption, error) {
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4,
}),
)

panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
})
grpcPanicRecoveryHandler := func(p interface{}) (err error) {
panicsTotal.Inc()
level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}

reg.MustRegister(met, panicsTotal)
opts := []grpc.ServerOption{
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
tracing.UnaryServerInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc_middleware.WithStreamServerChain(
met.StreamServerInterceptor(),
tracing.StreamServerInterceptor(tracer),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
}
func defaultGRPCServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
opts := []grpc.ServerOption{}

if key == "" && cert == "" {
if clientCA != "" {
Expand Down Expand Up @@ -325,6 +293,45 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
}

func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4,
}),
)
panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
})
reg.MustRegister(met, panicsTotal)

grpcPanicRecoveryHandler := func(p interface{}) (err error) {
panicsTotal.Inc()
level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
opts = append(opts,
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
tracing.UnaryServerInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc_middleware.WithStreamServerChain(
met.StreamServerInterceptor(),
tracing.StreamServerInterceptor(tracer),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
)

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, srv)
met.InitializeMetrics(s)

return s
}

// TODO Remove once all components are migrated to the new defaultHTTPListener.
// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
v1 "github.com/thanos-io/thanos/pkg/query/api"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
"google.golang.org/grpc"
Expand Down Expand Up @@ -441,15 +440,11 @@ func runQuery(
}
logger := log.With(logger, "component", component.Query.String())

met := grpc_prometheus.NewServerMetrics()
opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, srvCert, srvKey, srvClientCA)
opts, err := defaultGRPCServerOpts(logger, srvCert, srvKey, srvClientCA)
if err != nil {
return errors.Wrapf(err, "build gRPC server")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, proxy)
met.InitializeMetrics(s)
s := newStoreGRPCServer(logger, reg, tracer, proxy, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -25,7 +24,6 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
Expand Down Expand Up @@ -268,14 +266,11 @@ func runReceive(
db := localStorage.Get()
tsdbStore := store.NewTSDBStore(log.With(logger, "component", "thanos-tsdb-store"), reg, db, component.Receive, lset)

met := grpc_prometheus.NewServerMetrics()
opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s = grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, tsdbStore)
met.InitializeMetrics(s)
s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts)

level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -51,7 +50,6 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -494,14 +492,11 @@ func runRule(

store := store.NewTSDBStore(logger, reg, db, component.Rule, lset)

met := grpc_prometheus.NewServerMetrics()
opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC options")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, store)
met.InitializeMetrics(s)
s := newStoreGRPCServer(logger, reg, tracer, store, opts)

g.Add(func() error {
return errors.Wrap(s.Serve(l), "serve gRPC")
Expand Down
9 changes: 2 additions & 7 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -221,14 +219,11 @@ func runSidecar(
return errors.Wrap(err, "create Prometheus store")
}

met := grpc_prometheus.NewServerMetrics()
opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, promStore)
met.InitializeMetrics(s)
s := newStoreGRPCServer(logger, reg, tracer, promStore, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
Expand Down
11 changes: 2 additions & 9 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -17,8 +16,6 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -188,15 +185,11 @@ func runStore(
return errors.Wrap(err, "listen API address")
}

met := grpc_prometheus.NewServerMetrics()
opts, err := defaultGRPCServerOpts(logger, reg, tracer, met, cert, key, clientCA)
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, bs)
met.InitializeMetrics(s)
s := newStoreGRPCServer(logger, reg, tracer, bs, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
Expand Down

0 comments on commit 1636ff3

Please sign in to comment.