Skip to content

Commit

Permalink
Add interceptor override and make ingester and cfg public (#3618)
Browse files Browse the repository at this point in the history
* Add interceptor override and make ingester and cfg public

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove extraneous comment

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
  • Loading branch information
MichelHollands authored Apr 20, 2021
1 parent 31bb757 commit 515f82e
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 137 deletions.
37 changes: 23 additions & 14 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ type ClosableHealthAndIngesterClient struct {

// Config for an ingester client.
type Config struct {
PoolConfig distributor.PoolConfig `yaml:"pool_config,omitempty"`
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
PoolConfig distributor.PoolConfig `yaml:"pool_config,omitempty"`
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"`
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"`
}

// RegisterFlags registers flags.
Expand All @@ -63,7 +65,7 @@ func New(cfg Config, addr string) (HealthAndIngesterClient, error) {
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...),
}

dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation())
dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation(&cfg))
if err != nil {
return nil, err
}
Expand All @@ -82,14 +84,21 @@ func New(cfg Config, addr string) (HealthAndIngesterClient, error) {
}, nil
}

func instrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
return []grpc.UnaryClientInterceptor{
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration),
}, []grpc.StreamClientInterceptor{
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.StreamClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration),
}
func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
var unaryInterceptors []grpc.UnaryClientInterceptor
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...)
unaryInterceptors = append(unaryInterceptors,
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration),
)
var streamInterceptors []grpc.StreamClientInterceptor
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
streamInterceptors = append(streamInterceptors,
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.StreamClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration),
)

return unaryInterceptors, streamInterceptors
}
34 changes: 17 additions & 17 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *Config) Validate() error {

// Loki is the root datastructure for Loki.
type Loki struct {
cfg Config
Cfg Config

// set during initialization
ModuleManager *modules.Manager
Expand All @@ -154,7 +154,7 @@ type Loki struct {
overrides *validation.Overrides
tenantConfigs *runtime.TenantConfigs
distributor *distributor.Distributor
ingester *ingester.Ingester
Ingester *ingester.Ingester
Querier *querier.Querier
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
Expand All @@ -175,28 +175,28 @@ type Loki struct {
// New makes a new Loki.
func New(cfg Config) (*Loki, error) {
loki := &Loki{
cfg: cfg,
Cfg: cfg,
}

loki.setupAuthMiddleware()
if err := loki.setupModuleManager(); err != nil {
return nil, err
}
storage.RegisterCustomIndexClients(&loki.cfg.StorageConfig, prometheus.DefaultRegisterer)
storage.RegisterCustomIndexClients(&loki.Cfg.StorageConfig, prometheus.DefaultRegisterer)

return loki, nil
}

func (t *Loki) setupAuthMiddleware() {
t.cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{serverutil.RecoveryGRPCUnaryInterceptor}
t.cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{serverutil.RecoveryGRPCStreamInterceptor}
if t.cfg.AuthEnabled {
t.cfg.Server.GRPCMiddleware = append(t.cfg.Server.GRPCMiddleware, middleware.ServerUserHeaderInterceptor)
t.cfg.Server.GRPCStreamMiddleware = append(t.cfg.Server.GRPCStreamMiddleware, GRPCStreamAuthInterceptor)
t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{serverutil.RecoveryGRPCUnaryInterceptor}
t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{serverutil.RecoveryGRPCStreamInterceptor}
if t.Cfg.AuthEnabled {
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, middleware.ServerUserHeaderInterceptor)
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, GRPCStreamAuthInterceptor)
t.HTTPAuthMiddleware = middleware.AuthenticateUser
} else {
t.cfg.Server.GRPCMiddleware = append(t.cfg.Server.GRPCMiddleware, fakeGRPCAuthUnaryMiddleware)
t.cfg.Server.GRPCStreamMiddleware = append(t.cfg.Server.GRPCStreamMiddleware, fakeGRPCAuthStreamMiddleware)
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, fakeGRPCAuthUnaryMiddleware)
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, fakeGRPCAuthStreamMiddleware)
t.HTTPAuthMiddleware = fakeHTTPAuthMiddleware
}
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func newDefaultConfig() *Config {

// Run starts Loki running, and blocks until a Loki stops.
func (t *Loki) Run() error {
serviceMap, err := t.ModuleManager.InitModuleServices(t.cfg.Target)
serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target)
if err != nil {
return err
}
Expand All @@ -247,7 +247,7 @@ func (t *Loki) Run() error {
t.Server.HTTP.Path("/ready").Handler(t.readyHandler(sm))

// This adds a way to see the config and the changes compared to the defaults
t.Server.HTTP.Path("/config").HandlerFunc(configHandler(t.cfg, newDefaultConfig()))
t.Server.HTTP.Path("/config").HandlerFunc(configHandler(t.Cfg, newDefaultConfig()))

t.Server.HTTP.Path("/debug/fgprof").Handler(fgprof.Handler())

Expand Down Expand Up @@ -325,8 +325,8 @@ func (t *Loki) readyHandler(sm *services.Manager) http.HandlerFunc {

// Ingester has a special check that makes sure that it was able to register into the ring,
// and that all other ring entries are OK too.
if t.ingester != nil {
if err := t.ingester.CheckReady(r.Context()); err != nil {
if t.Ingester != nil {
if err := t.Ingester.CheckReady(r.Context()); err != nil {
http.Error(w, "Ingester not ready: "+err.Error(), http.StatusServiceUnavailable)
return
}
Expand Down Expand Up @@ -384,13 +384,13 @@ func (t *Loki) setupModuleManager() error {
}

// Add IngesterQuerier as a dependency for store when target is either ingester or querier.
if t.cfg.Target == Querier || t.cfg.Target == Ruler {
if t.Cfg.Target == Querier || t.Cfg.Target == Ruler {
deps[Store] = append(deps[Store], IngesterQuerier)
}

// If we are running Loki with boltdb-shipper as a single binary, without clustered mode(which should always be the case when using inmemory ring),
// we should start compactor as well for better user experience.
if storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) && t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Store == "inmemory" {
if storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Store == "inmemory" {
deps[All] = append(deps[All], Compactor)
}

Expand Down
Loading

0 comments on commit 515f82e

Please sign in to comment.