From 7b6f0577c3277b84230f0f2deba747b01ca2b2fa Mon Sep 17 00:00:00 2001 From: Dylan Guedes Date: Mon, 6 May 2024 09:04:04 -0300 Subject: [PATCH] feat: Querier: Split gRPC client into two. (#12726) **What this PR does / why we need it**: Split the gRPC client used by the querier into two, one for the communication with the scheduler, the other for communicating with the query-frontend. - This change is retrocompatible: you don't have to change anything to keep existing behavior. - To configure the custom scheduler grpc client, you can use the new `query_scheduler_grpc_client` config or the new CLI flag `querier.scheduler-grpc-client` - If you'd like to configure your frontend grpc client using a better named section, you can use the new `query_frontend_grpc_client` instead of the old `grpc_client_config`. Just make sure you don't use both at the same time, it will result in an error. This work is necessary for configuring custom behavior between `querier<->scheduler` vs `querier<->frontend`. A use case is configuring mTLS when a different certificate is used by queriers, schedulers and frontends. You can only configure a single `server_name` with our current setup, making it impossible. --- docs/sources/shared/configuration.md | 27 ++++-- pkg/loki/config_wrapper.go | 23 +++++ pkg/loki/config_wrapper_test.go | 103 ++++++++++++++++++++++ pkg/querier/worker/frontend_processor.go | 2 +- pkg/querier/worker/scheduler_processor.go | 4 +- pkg/querier/worker/worker.go | 55 ++++++++---- pkg/querier/worker/worker_test.go | 36 +++++++- tools/doc-generator/main.go | 6 ++ 8 files changed, 229 insertions(+), 27 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 674bb09ff0b9d..df40267e9ae84 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -543,19 +543,19 @@ The `alibabacloud_storage_config` block configures the connection to Alibaba Clo ```yaml # Name of OSS bucket. -# CLI flag: -common.storage.oss.bucketname +# CLI flag: -.storage.oss.bucketname [bucket: | default = ""] # oss Endpoint to connect to. -# CLI flag: -common.storage.oss.endpoint +# CLI flag: -.storage.oss.endpoint [endpoint: | default = ""] # alibabacloud Access Key ID -# CLI flag: -common.storage.oss.access-key-id +# CLI flag: -.storage.oss.access-key-id [access_key_id: | default = ""] # alibabacloud Secret Access Key -# CLI flag: -common.storage.oss.secret-access-key +# CLI flag: -.storage.oss.secret-access-key [secret_access_key: | default = ""] ``` @@ -2236,10 +2236,23 @@ The `frontend_worker` configures the worker - running within the Loki querier - # CLI flag: -querier.id [id: | default = ""] -# The grpc_client block configures the gRPC client used to communicate between a -# client and server component in Loki. +# Configures the querier gRPC client used to communicate with the +# query-frontend. Shouldn't be used in conjunction with 'grpc_client_config'. +# The CLI flags prefix for this block configuration is: +# querier.frontend-grpc-client +[query_frontend_grpc_client: ] + +# Configures the querier gRPC client used to communicate with the query-frontend +# and with the query-scheduler if 'query_scheduler_grpc_client' isn't defined. +# This shouldn't be used if 'query_frontend_grpc_client' is defined. # The CLI flags prefix for this block configuration is: querier.frontend-client [grpc_client_config: ] + +# Configures the querier gRPC client used to communicate with the +# query-scheduler. If not defined, 'grpc_client_config' is used instead. +# The CLI flags prefix for this block configuration is: +# querier.scheduler-grpc-client +[query_scheduler_grpc_client: ] ``` ### gcs_storage_config @@ -2297,6 +2310,8 @@ The `grpc_client` block configures the gRPC client used to communicate between a - `ingester.client` - `pattern-ingester.client` - `querier.frontend-client` +- `querier.frontend-grpc-client` +- `querier.scheduler-grpc-client` - `query-scheduler.grpc-client-config` - `ruler.client` - `tsdb.shipper.index-gateway-client.grpc` diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index a0e5fa5043d55..2a4789fb9e60f 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -125,6 +125,9 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { applyIngesterFinalSleep(r) applyIngesterReplicationFactor(r) applyChunkRetain(r, &defaults) + if err := applyCommonQuerierWorkerGRPCConfig(r, &defaults); err != nil { + return err + } return nil } @@ -684,3 +687,23 @@ func applyChunkRetain(cfg, defaults *ConfigWrapper) { } } } + +func applyCommonQuerierWorkerGRPCConfig(cfg, defaults *ConfigWrapper) error { + if !reflect.DeepEqual(cfg.Worker.OldQueryFrontendGRPCClientConfig, defaults.Worker.OldQueryFrontendGRPCClientConfig) { + // User is using the old grpc configuration. + + if reflect.DeepEqual(cfg.Worker.NewQueryFrontendGRPCClientConfig, defaults.Worker.NewQueryFrontendGRPCClientConfig) { + // User is using the old grpc configuration only, we can just copy it to the new grpc client struct. + cfg.Worker.NewQueryFrontendGRPCClientConfig = cfg.Worker.OldQueryFrontendGRPCClientConfig + } else { + // User is using both, old and new way of configuring the grpc client, so we throw an error. + return fmt.Errorf("both `grpc_client_config` and `query_frontend_grpc_client` are set at the same time. Please use only one of them") + } + + if reflect.DeepEqual(cfg.Worker.QuerySchedulerGRPCClientConfig, defaults.Worker.QuerySchedulerGRPCClientConfig) { + // Since the scheduler grpc client is not set, we can just copy the old query frontend grpc client to the scheduler grpc client. + cfg.Worker.QuerySchedulerGRPCClientConfig = cfg.Worker.OldQueryFrontendGRPCClientConfig + } + } + return nil +} diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index bda2b8fa2596f..d010419770936 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -799,6 +799,109 @@ query_range: config, _ := testContext(configFileString, nil) assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled) }) + + t.Run("querier worker grpc client behavior", func(t *testing.T) { + newConfigBothClientsSet := `--- +frontend_worker: + query_frontend_grpc_client: + tls_server_name: query-frontend + query_scheduler_grpc_client: + tls_server_name: query-scheduler +` + + oldConfig := `--- +frontend_worker: + grpc_client_config: + tls_server_name: query-frontend +` + + mixedConfig := `--- +frontend_worker: + grpc_client_config: + tls_server_name: query-frontend-old + query_frontend_grpc_client: + tls_server_name: query-frontend-new + query_scheduler_grpc_client: + tls_server_name: query-scheduler +` + t.Run("new configs are used", func(t *testing.T) { + asserts := func(config ConfigWrapper) { + require.EqualValues(t, "query-frontend", config.Worker.NewQueryFrontendGRPCClientConfig.TLS.ServerName) + require.EqualValues(t, "query-scheduler", config.Worker.QuerySchedulerGRPCClientConfig.TLS.ServerName) + // we never want to use zero values by default. + require.NotEqualValues(t, 0, config.Worker.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize) + require.NotEqualValues(t, 0, config.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize) + } + + yamlConfig, _, err := configWrapperFromYAML(t, newConfigBothClientsSet, nil) + require.NoError(t, err) + asserts(yamlConfig) + + // repeat the test using only cli flags. + cliFlags := []string{ + "-querier.frontend-grpc-client.tls-server-name=query-frontend", + "-querier.scheduler-grpc-client.tls-server-name=query-scheduler", + } + cliConfig, _, err := configWrapperFromYAML(t, emptyConfigString, cliFlags) + require.NoError(t, err) + asserts(cliConfig) + }) + + t.Run("old config works the same way", func(t *testing.T) { + asserts := func(config ConfigWrapper) { + require.EqualValues(t, "query-frontend", config.Worker.NewQueryFrontendGRPCClientConfig.TLS.ServerName) + require.EqualValues(t, "query-frontend", config.Worker.QuerySchedulerGRPCClientConfig.TLS.ServerName) + + // we never want to use zero values by default. + require.NotEqualValues(t, 0, config.Worker.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize) + require.NotEqualValues(t, 0, config.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize) + } + + yamlConfig, _, err := configWrapperFromYAML(t, oldConfig, nil) + require.NoError(t, err) + asserts(yamlConfig) + + // repeat the test using only cli flags. + cliFlags := []string{ + "-querier.frontend-client.tls-server-name=query-frontend", + } + cliConfig, _, err := configWrapperFromYAML(t, emptyConfigString, cliFlags) + require.NoError(t, err) + asserts(cliConfig) + }) + + t.Run("mixed frontend clients throws an error", func(t *testing.T) { + _, _, err := configWrapperFromYAML(t, mixedConfig, nil) + require.Error(t, err) + + // repeat the test using only cli flags. + _, _, err = configWrapperFromYAML(t, emptyConfigString, []string{ + "-querier.frontend-client.tls-server-name=query-frontend", + "-querier.frontend-grpc-client.tls-server-name=query-frontend", + }) + require.Error(t, err) + + // repeat the test mixing the YAML with cli flags. + _, _, err = configWrapperFromYAML(t, newConfigBothClientsSet, []string{ + "-querier.frontend-client.tls-server-name=query-frontend", + }) + require.Error(t, err) + }) + + t.Run("mix correct cli flags with YAML configs", func(t *testing.T) { + config, _, err := configWrapperFromYAML(t, newConfigBothClientsSet, []string{ + "-querier.scheduler-grpc-client.tls-enabled=true", + }) + require.NoError(t, err) + + require.EqualValues(t, "query-frontend", config.Worker.NewQueryFrontendGRPCClientConfig.TLS.ServerName) + require.EqualValues(t, "query-scheduler", config.Worker.QuerySchedulerGRPCClientConfig.TLS.ServerName) + // we never want to use zero values by default. + require.NotEqualValues(t, 0, config.Worker.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize) + require.NotEqualValues(t, 0, config.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize) + require.True(t, config.Worker.QuerySchedulerGRPCClientConfig.TLSEnabled) + }) + }) } const defaultResulsCacheString = `--- diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index a0e3569359bfa..1327a30ae3190 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -30,7 +30,7 @@ func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger, co log: log, handler: handler, codec: codec, - maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, + maxMessageSize: cfg.NewQueryFrontendGRPCClientConfig.MaxSendMsgSize, querierID: cfg.QuerierID, } } diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 00b08219e5dbe..97f6d8f4d1df9 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -38,9 +38,9 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, m log: log, handler: handler, codec: codec, - maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, + maxMessageSize: cfg.NewQueryFrontendGRPCClientConfig.MaxRecvMsgSize, querierID: cfg.QuerierID, - grpcConfig: cfg.GRPCClientConfig, + grpcConfig: cfg.NewQueryFrontendGRPCClientConfig, schedulerClientFactory: func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient { return schedulerpb.NewSchedulerForQuerierClient(conn) }, diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index bc41a49d9075d..7d7b46dc814f5 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -30,7 +30,10 @@ type Config struct { QuerierID string `yaml:"id"` - GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + NewQueryFrontendGRPCClientConfig grpcclient.Config `yaml:"query_frontend_grpc_client" doc:"description=Configures the querier gRPC client used to communicate with the query-frontend. Shouldn't be used in conjunction with 'grpc_client_config'."` + OldQueryFrontendGRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the querier gRPC client used to communicate with the query-frontend and with the query-scheduler if 'query_scheduler_grpc_client' isn't defined. This shouldn't be used if 'query_frontend_grpc_client' is defined."` + + QuerySchedulerGRPCClientConfig grpcclient.Config `yaml:"query_scheduler_grpc_client" doc:"description=Configures the querier gRPC client used to communicate with the query-scheduler. If not defined, 'grpc_client_config' is used instead."` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -39,14 +42,25 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.DNSLookupPeriod, "querier.dns-lookup-period", 3*time.Second, "How often to query DNS for query-frontend or query-scheduler address. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.") f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.") - cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) + // Register old client as the frontend-client flag for retro-compatibility. + cfg.OldQueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) + + cfg.NewQueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-grpc-client", f) + cfg.QuerySchedulerGRPCClientConfig.RegisterFlagsWithPrefix("querier.scheduler-grpc-client", f) } func (cfg *Config) Validate() error { if cfg.FrontendAddress != "" && cfg.SchedulerAddress != "" { return errors.New("frontend address and scheduler address are mutually exclusive, please use only one") } - return cfg.GRPCClientConfig.Validate() + if err := cfg.NewQueryFrontendGRPCClientConfig.Validate(); err != nil { + return err + } + if err := cfg.OldQueryFrontendGRPCClientConfig.Validate(); err != nil { + return err + } + + return cfg.QuerySchedulerGRPCClientConfig.Validate() } // Handler for HTTP requests wrapped in protobuf messages. @@ -80,7 +94,6 @@ type processor interface { type querierWorker struct { *services.BasicService - cfg Config logger log.Logger processor processor @@ -92,6 +105,9 @@ type querierWorker struct { managers map[string]*processorManager metrics *Metrics + + grpcClientConfig grpcclient.Config + maxConcurrentRequests int } func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, logger log.Logger, reg prometheus.Registerer, codec RequestCodec) (services.Service, error) { @@ -105,16 +121,19 @@ func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, log metrics := NewMetrics(cfg, reg) var processor processor + var grpcCfg grpcclient.Config var servs []services.Service var address string switch { case rng != nil: level.Info(logger).Log("msg", "Starting querier worker using query-scheduler and scheduler ring for addresses") + grpcCfg = cfg.QuerySchedulerGRPCClientConfig processor, servs = newSchedulerProcessor(cfg, handler, logger, metrics, codec) case cfg.SchedulerAddress != "": level.Info(logger).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress) + grpcCfg = cfg.QuerySchedulerGRPCClientConfig address = cfg.SchedulerAddress processor, servs = newSchedulerProcessor(cfg, handler, logger, metrics, codec) @@ -122,26 +141,28 @@ func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, log level.Info(logger).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress) address = cfg.FrontendAddress + grpcCfg = cfg.NewQueryFrontendGRPCClientConfig processor = newFrontendProcessor(cfg, handler, logger, codec) default: return nil, errors.New("unable to start the querier worker, need to configure one of frontend_address, scheduler_address, or a ring config in the query_scheduler config block") } - return newQuerierWorkerWithProcessor(cfg, metrics, logger, processor, address, rng, servs) + return newQuerierWorkerWithProcessor(grpcCfg, cfg.MaxConcurrent, cfg.DNSLookupPeriod, metrics, logger, processor, address, rng, servs) } -func newQuerierWorkerWithProcessor(cfg Config, metrics *Metrics, logger log.Logger, processor processor, address string, ring ring.ReadRing, servs []services.Service) (*querierWorker, error) { +func newQuerierWorkerWithProcessor(grpcCfg grpcclient.Config, maxConcReq int, dnsLookupPeriod time.Duration, metrics *Metrics, logger log.Logger, processor processor, address string, ring ring.ReadRing, servs []services.Service) (*querierWorker, error) { f := &querierWorker{ - cfg: cfg, - logger: logger, - managers: map[string]*processorManager{}, - processor: processor, - metrics: metrics, + maxConcurrentRequests: maxConcReq, + grpcClientConfig: grpcCfg, + logger: logger, + managers: map[string]*processorManager{}, + processor: processor, + metrics: metrics, } // Empty address is only used in tests, where individual targets are added manually. if address != "" { - w, err := util.NewDNSWatcher(address, cfg.DNSLookupPeriod, f) + w, err := util.NewDNSWatcher(address, dnsLookupPeriod, f) if err != nil { return nil, err } @@ -150,7 +171,7 @@ func newQuerierWorkerWithProcessor(cfg Config, metrics *Metrics, logger log.Logg } if ring != nil { - w, err := util.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, cfg.DNSLookupPeriod, f) + w, err := util.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, dnsLookupPeriod, f) if err != nil { return nil, err } @@ -245,17 +266,17 @@ func (w *querierWorker) resetConcurrency() { }() for _, m := range w.managers { - concurrency := w.cfg.MaxConcurrent / len(w.managers) + concurrency := w.maxConcurrentRequests / len(w.managers) // If max concurrency does not evenly divide into our frontends a subset will be chosen // to receive an extra connection. Frontend addresses were shuffled above so this will be a // random selection of frontends. - if index < w.cfg.MaxConcurrent%len(w.managers) { + if index < w.maxConcurrentRequests%len(w.managers) { level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address) concurrency++ } - // If concurrency is 0 then MaxConcurrentRequests is less than the total number of + // If concurrency is 0 then maxConcurrentRequests is less than the total number of // frontends/schedulers. In order to prevent accidentally starving a frontend or scheduler we are just going to // always connect once to every target. This is dangerous b/c we may start exceeding LogQL // max concurrency. @@ -271,7 +292,7 @@ func (w *querierWorker) resetConcurrency() { func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) { // Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics. - opts, err := w.cfg.GRPCClientConfig.DialOption(nil, nil) + opts, err := w.grpcClientConfig.DialOption(nil, nil) if err != nil { return nil, err } diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go index fb311925fb207..1633554b7a136 100644 --- a/pkg/querier/worker/worker_test.go +++ b/pkg/querier/worker/worker_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/crypto/tls" + "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/services" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -54,7 +56,7 @@ func TestResetConcurrency(t *testing.T) { MaxConcurrent: tt.maxConcurrent, } - w, err := newQuerierWorkerWithProcessor(cfg, NewMetrics(cfg, nil), log.NewNopLogger(), &mockProcessor{}, "", nil, nil) + w, err := newQuerierWorkerWithProcessor(cfg.QuerySchedulerGRPCClientConfig, cfg.MaxConcurrent, cfg.DNSLookupPeriod, NewMetrics(cfg, nil), log.NewNopLogger(), &mockProcessor{}, "", nil, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), w)) @@ -93,3 +95,35 @@ func (m mockProcessor) processQueriesOnSingleStream(ctx context.Context, _ *grpc } func (m mockProcessor) notifyShutdown(_ context.Context, _ *grpc.ClientConn, _ string) {} + +func TestGRPCConfigBehavior(t *testing.T) { + logger := log.NewNopLogger() + + t.Run("uses separated GRPC TLS server names", func(t *testing.T) { + cfg := Config{ + SchedulerAddress: "scheduler:9095", + QuerySchedulerGRPCClientConfig: grpcclient.Config{ + TLS: tls.ClientConfig{ + ServerName: "query-scheduler", + }, + }, + NewQueryFrontendGRPCClientConfig: grpcclient.Config{ + TLS: tls.ClientConfig{ + ServerName: "query-frontend", + }, + }, + } + + qw, err := NewQuerierWorker(cfg, nil, nil, logger, nil, nil) + require.NoError(t, err) + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), qw)) + + // grpc client the querier uses to talk to the scheduler, so the expected server name is "query-scheduler". + castedQw := qw.(*querierWorker) + require.Equal(t, "query-scheduler", castedQw.grpcClientConfig.TLS.ServerName) + + // grpc client the querier uses to return results to the frontend, so the expected server name is "query-frontend". + sp := castedQw.processor.(*schedulerProcessor) + require.Equal(t, "query-frontend", sp.grpcConfig.TLS.ServerName) + }) +} diff --git a/tools/doc-generator/main.go b/tools/doc-generator/main.go index 24f6c82ef0cff..7648bb407f6fc 100644 --- a/tools/doc-generator/main.go +++ b/tools/doc-generator/main.go @@ -104,6 +104,12 @@ func generateBlocksMarkdown(blocks []*parse.ConfigBlock) string { return 1 } + if a.FlagsPrefix < b.FlagsPrefix { + return -1 + } + if a.FlagsPrefix < b.FlagsPrefix { + return 1 + } return 0 })