From d1d202cc933c8ae695866fd7ec03b2cb27156293 Mon Sep 17 00:00:00 2001 From: Jordan Rushing Date: Fri, 17 Sep 2021 16:03:25 -0500 Subject: [PATCH 1/3] Add recovery middleware to ingester; re-add recovery middleware to querier when not running in standalone mode --- pkg/loki/modules.go | 10 +++++++--- pkg/querier/worker_service.go | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index c334cf8501b4..fd9a6fb712db 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -238,12 +238,16 @@ func (t *Loki) initIngester() (_ services.Service, err error) { if err != nil { return } - logproto.RegisterPusherServer(t.Server.GRPC, t.Ingester) logproto.RegisterQuerierServer(t.Server.GRPC, t.Ingester) logproto.RegisterIngesterServer(t.Server.GRPC, t.Ingester) - t.Server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.Ingester.FlushHandler)) - t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(http.HandlerFunc(t.Ingester.ShutdownHandler)) + + httpMiddleware := middleware.Merge( + serverutil.RecoveryHTTPMiddleware, + ) + t.Server.HTTP.Path("/flush").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler))) + t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler))) + return t.Ingester, nil } diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index 6650ae24c747..9203a459e67b 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -104,6 +104,7 @@ func InitWorkerService( // HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the // request context, as well as make sure any x-www-url-formencoded params are correctly parsed httpMiddleware := middleware.Merge( + serverutil.RecoveryHTTPMiddleware, authMiddleware, serverutil.NewPrepopulateMiddleware(), ) From 3103a1925588e93c7f87985e74f0553ec3641499 Mon Sep 17 00:00:00 2001 From: Jordan Rushing Date: Mon, 27 Sep 2021 11:23:22 -0500 Subject: [PATCH 2/3] Re-add gRPC recovery middleware --- pkg/loki/loki.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 1f59e11d2d28..6b3e9eae7c5c 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -29,6 +29,7 @@ import ( "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" "github.com/weaveworks/common/signals" + "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/pkg/distributor" @@ -43,6 +44,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" "github.com/grafana/loki/pkg/tracing" + serverutil "github.com/grafana/loki/pkg/util/server" "github.com/grafana/loki/pkg/validation" ) @@ -220,6 +222,8 @@ func New(cfg Config) (*Loki, error) { } func (t *Loki) setupAuthMiddleware() { + t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{serverutil.RecoveryGRPCUnaryInterceptor} + t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{serverutil.RecoveryGRPCStreamInterceptor} // Don't check auth header on TransferChunks, as we weren't originally // sending it and this could cause transfers to fail on update. t.HTTPAuthMiddleware = fakeauth.SetupAuthMiddleware(&t.Cfg.Server, t.Cfg.AuthEnabled, From 87892c854d729443eb9b060bc27d513d83b71966 Mon Sep 17 00:00:00 2001 From: Jordan Rushing Date: Mon, 27 Sep 2021 14:10:08 -0500 Subject: [PATCH 3/3] Break out gRPC recovery middleware from setupAuthMiddleware() into new function setupGRPCRecoveryMiddleware() --- pkg/loki/loki.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 6b3e9eae7c5c..7ddff964a087 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -29,7 +29,6 @@ import ( "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" "github.com/weaveworks/common/signals" - "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/pkg/distributor" @@ -213,6 +212,7 @@ func New(cfg Config) (*Loki, error) { } loki.setupAuthMiddleware() + loki.setupGRPCRecoveryMiddleware() if err := loki.setupModuleManager(); err != nil { return nil, err } @@ -222,8 +222,6 @@ func New(cfg Config) (*Loki, error) { } func (t *Loki) setupAuthMiddleware() { - t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{serverutil.RecoveryGRPCUnaryInterceptor} - t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{serverutil.RecoveryGRPCStreamInterceptor} // Don't check auth header on TransferChunks, as we weren't originally // sending it and this could cause transfers to fail on update. t.HTTPAuthMiddleware = fakeauth.SetupAuthMiddleware(&t.Cfg.Server, t.Cfg.AuthEnabled, @@ -239,6 +237,11 @@ func (t *Loki) setupAuthMiddleware() { }) } +func (t *Loki) setupGRPCRecoveryMiddleware() { + t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, serverutil.RecoveryGRPCUnaryInterceptor) + t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, serverutil.RecoveryGRPCStreamInterceptor) +} + func newDefaultConfig() *Config { defaultConfig := &Config{} defaultFS := flag.NewFlagSet("", flag.PanicOnError)