Skip to content

Commit

Permalink
Add recovery middleware to Ingester; re-add recovery middleware to Qu…
Browse files Browse the repository at this point in the history
…erier when not running in standalone mode (#4349)

* Add recovery middleware to ingester; re-add recovery middleware to querier when not running in standalone mode

* Re-add gRPC recovery middleware

* Break out gRPC recovery middleware from setupAuthMiddleware() into new function setupGRPCRecoveryMiddleware()
  • Loading branch information
JordanRushing authored Sep 28, 2021
1 parent 4784ea2 commit 5e8a204
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 3 deletions.
7 changes: 7 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/tracing"
serverutil "github.com/grafana/loki/pkg/util/server"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -211,6 +212,7 @@ func New(cfg Config) (*Loki, error) {
}

loki.setupAuthMiddleware()
loki.setupGRPCRecoveryMiddleware()
if err := loki.setupModuleManager(); err != nil {
return nil, err
}
Expand All @@ -235,6 +237,11 @@ func (t *Loki) setupAuthMiddleware() {
})
}

func (t *Loki) setupGRPCRecoveryMiddleware() {
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, serverutil.RecoveryGRPCUnaryInterceptor)
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, serverutil.RecoveryGRPCStreamInterceptor)
}

func newDefaultConfig() *Config {
defaultConfig := &Config{}
defaultFS := flag.NewFlagSet("", flag.PanicOnError)
Expand Down
10 changes: 7 additions & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,16 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
if err != nil {
return
}

logproto.RegisterPusherServer(t.Server.GRPC, t.Ingester)
logproto.RegisterQuerierServer(t.Server.GRPC, t.Ingester)
logproto.RegisterIngesterServer(t.Server.GRPC, t.Ingester)
t.Server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.Ingester.FlushHandler))
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(http.HandlerFunc(t.Ingester.ShutdownHandler))

httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
)
t.Server.HTTP.Path("/flush").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)))
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)))

return t.Ingester, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func InitWorkerService(
// HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the
// request context, as well as make sure any x-www-url-formencoded params are correctly parsed
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
)
Expand Down

0 comments on commit 5e8a204

Please sign in to comment.