From ab9bd25489c3f18ae5c15914aaf82f8ffa9b0281 Mon Sep 17 00:00:00 2001 From: Annanay Date: Tue, 8 Dec 2020 17:24:12 +0530 Subject: [PATCH 1/9] Checkpoint: first pass on adding all QF components Signed-off-by: Annanay --- cmd/tempo/app/app.go | 25 ++-- cmd/tempo/app/modules.go | 51 +++++++- modules/frontend/frontend.go | 227 +++++++++++++++++++++++++++++++++++ modules/querier/config.go | 2 + modules/querier/querier.go | 42 +++++-- tempodb/tempodb.go | 7 +- 6 files changed, 324 insertions(+), 30 deletions(-) create mode 100644 modules/frontend/frontend.go diff --git a/cmd/tempo/app/app.go b/cmd/tempo/app/app.go index fe0de180cba..7bb053e6a06 100644 --- a/cmd/tempo/app/app.go +++ b/cmd/tempo/app/app.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" + cortex_frontend "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/util" @@ -25,6 +26,7 @@ import ( "github.com/grafana/tempo/modules/compactor" "github.com/grafana/tempo/modules/distributor" "github.com/grafana/tempo/modules/ingester" + "github.com/grafana/tempo/modules/frontend" ingester_client "github.com/grafana/tempo/modules/ingester/client" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/querier" @@ -40,15 +42,17 @@ type Config struct { AuthEnabled bool `yaml:"auth_enabled,omitempty"` HTTPPrefix string `yaml:"http_prefix"` - Server server.Config `yaml:"server,omitempty"` - Distributor distributor.Config `yaml:"distributor,omitempty"` - IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` - Querier querier.Config `yaml:"querier,omitempty"` - Compactor compactor.Config `yaml:"compactor,omitempty"` - Ingester ingester.Config `yaml:"ingester,omitempty"` - StorageConfig storage.Config `yaml:"storage,omitempty"` - LimitsConfig overrides.Limits `yaml:"overrides,omitempty"` - MemberlistKV memberlist.KVConfig `yaml:"memberlist,omitempty"` + Server server.Config `yaml:"server,omitempty"` + Distributor distributor.Config `yaml:"distributor,omitempty"` + IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` + Querier querier.Config `yaml:"querier,omitempty"` + Frontend frontend.FrontendConfig `yaml:"frontend,omitempty"` + Worker cortex_frontend.WorkerConfig `yaml:"frontend_worker"` + Compactor compactor.Config `yaml:"compactor,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` + StorageConfig storage.Config `yaml:"storage,omitempty"` + LimitsConfig overrides.Limits `yaml:"overrides,omitempty"` + MemberlistKV memberlist.KVConfig `yaml:"memberlist,omitempty"` } // RegisterFlagsAndApplyDefaults registers flag. @@ -78,6 +82,8 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { c.Distributor.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "distributor"), f) c.Ingester.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "ingester"), f) c.Querier.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "querier"), f) + c.Frontend.ApplyDefaults() + // todo: add worker defaults c.Compactor.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "compactor"), f) c.StorageConfig.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "storage"), f) @@ -110,6 +116,7 @@ type App struct { overrides *overrides.Overrides distributor *distributor.Distributor querier *querier.Querier + frontend *cortex_frontend.Frontend compactor *compactor.Compactor ingester *ingester.Ingester store storage.Store diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 88bf43fb753..955935a7d98 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -6,6 +6,9 @@ import ( "os" "github.com/cortexproject/cortex/pkg/cortex" + cortex_querier "github.com/cortexproject/cortex/pkg/querier" + cortex_frontend "github.com/cortexproject/cortex/pkg/querier/frontend" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" @@ -19,6 +22,7 @@ import ( "github.com/grafana/tempo/modules/compactor" "github.com/grafana/tempo/modules/distributor" + "github.com/grafana/tempo/modules/frontend" "github.com/grafana/tempo/modules/ingester" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/querier" @@ -35,6 +39,7 @@ const ( Distributor string = "distributor" Ingester string = "ingester" Querier string = "querier" + Frontend string = "frontend" Compactor string = "compactor" Store string = "store" MemberlistKV string = "memberlist-kv" @@ -133,10 +138,48 @@ func (t *App) initQuerier() (services.Service, error) { tracesHandler := middleware.Merge( t.httpAuthMiddleware, ).Wrap(http.HandlerFunc(t.querier.TraceByIDHandler)) - t.server.HTTP.Handle("/api/traces/{traceID}", tracesHandler) - return t.querier, nil + worker, err := cortex_frontend.NewWorker( + t.cfg.Worker, + cortex_querier.Config{ + MaxConcurrent: t.cfg.Querier.MaxConcurrent, + }, + httpgrpc_server.NewServer(tracesHandler), + util.Logger, + ) + if err != nil { + return nil, fmt.Errorf("failed to create frontend worker %w", err) + } + + err = querier.RegisterSubservices(worker) + if err != nil { + return nil, fmt.Errorf("failed to register frontend worker %w", err) + } + + return querier, nil +} + +func (t *App) initQueryFrontend() (services.Service, error) { + var err error + t.frontend, err = cortex_frontend.New(t.cfg.Frontend.Config, util.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, err + } + + // custom tripperware that splits requests + tripperware, err := frontend.NewTripperware(t.cfg.Frontend, util.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, err + } + // tripperware will be called before f.roundTripper (which calls roundtripgrpc) + t.frontend.Wrap(tripperware) + + cortex_frontend.RegisterFrontendServer(t.server.GRPC, t.frontend) + // register at a different endpoint for now + t.server.HTTP.Handle("/api/traces/frontend/{traceID}", t.frontend.Handler()) + + return nil, nil } func (t *App) initCompactor() (services.Service, error) { @@ -197,6 +240,7 @@ func (t *App) setupModuleManager() error { mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(Ingester, t.initIngester) mm.RegisterModule(Querier, t.initQuerier) + mm.RegisterModule(Frontend, t.initQueryFrontend) mm.RegisterModule(Compactor, t.initCompactor) mm.RegisterModule(Store, t.initStore, modules.UserInvisibleModule) mm.RegisterModule(All, nil) @@ -206,12 +250,13 @@ func (t *App) setupModuleManager() error { // Overrides: nil, // Store: nil, // MemberlistKV: nil, + Frontend: {Server}, Ring: {Server, MemberlistKV}, Distributor: {Ring, Server, Overrides}, Ingester: {Store, Server, Overrides, MemberlistKV}, Querier: {Store, Ring}, Compactor: {Store, Server, MemberlistKV}, - All: {Compactor, Querier, Ingester, Distributor}, + All: {Compactor, Frontend, Querier, Ingester, Distributor}, } for mod, targets := range deps { diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go new file mode 100644 index 00000000000..037f63d3fec --- /dev/null +++ b/modules/frontend/frontend.go @@ -0,0 +1,227 @@ +package frontend + +import ( + "context" + "encoding/binary" + "fmt" + "github.com/cortexproject/cortex/pkg/querier/frontend" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/user" + "net/http" +) + +type FrontendConfig struct { + frontend.Config `yaml:",inline"` + ShardNum int `yaml:"shard_num,omitempty"` +} + +func (cfg *FrontendConfig) ApplyDefaults() { + cfg.Config.CompressResponses = false + cfg.Config.DownstreamURL = "" + cfg.Config.LogQueriesLongerThan = 0 + cfg.Config.MaxOutstandingPerTenant = 100 + cfg.ShardNum = 4 +} + +// NewTripperware returns a Tripperware configured with a middleware to split requests +func NewTripperware(cfg FrontendConfig, logger log.Logger, registerer prometheus.Registerer) (frontend.Tripperware, error) { + level.Info(logger).Log("msg", "creating tripperware in query frontend to shard queries") + queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "query_frontend_queries_total", + Help: "Total queries sent per tenant.", + }, []string{"user"}) + + return func(next http.RoundTripper) http.RoundTripper { + // get the http request, add some custom parameters to it, split it, and call downstream roundtripper + rt := NewRoundTripper(next, ShardingWare(cfg.ShardNum, logger, registerer)) + return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { + level.Info(util.Logger).Log("msg", "request received by custom tripperware") + orgID := r.Header.Get(user.OrgIDHeaderName) + queriesPerTenant.WithLabelValues(orgID).Inc() + + r = r.WithContext(user.InjectOrgID(r.Context(), orgID)) + return rt.RoundTrip(r) + }) + }, nil +} + +type Handler interface { + Do(*http.Request) (*http.Response, error) +} + +type Middleware interface { + Wrap(Handler) Handler +} + +// MiddlewareFunc is like http.HandlerFunc, but for Middleware. +type MiddlewareFunc func(Handler) Handler + +// Wrap implements Middleware. +func (q MiddlewareFunc) Wrap(h Handler) Handler { + return q(h) +} + +func MergeMiddlewares(middleware ...Middleware) Middleware { + return MiddlewareFunc(func(next Handler) Handler { + for i := len(middleware) - 1; i >= 0; i-- { + next = middleware[i].Wrap(next) + } + return next + }) +} + +type roundTripper struct { + next http.RoundTripper + handler Handler +} + +// NewRoundTripper merges a set of middlewares into an handler, then inject it into the `next` roundtripper +func NewRoundTripper(next http.RoundTripper, middlewares ...Middleware) http.RoundTripper { + transport := roundTripper{ + next: next, + } + transport.handler = MergeMiddlewares(middlewares...).Wrap(&transport) + return transport +} + +func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + level.Info(util.Logger).Log("blockStart", r.URL.Query().Get("blockStart"), "blockEnd", r.URL.Query().Get("blockEnd")) + return q.handler.Do(r) +} + +// Do implements Handler. +func (q roundTripper) Do(r *http.Request) (*http.Response, error) { + level.Info(util.Logger).Log("msg", "roundTripper.Do called") + return q.next.RoundTrip(r) +} + +func ShardingWare(shardNum int, logger log.Logger, registerer prometheus.Registerer) Middleware { + return MiddlewareFunc(func(next Handler) Handler { + return shardQuery{ + next: next, + shardNum: shardNum, + logger: logger, + splitByCounter: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "query_frontend_split_queries_total", + Help: "Total number of underlying query requests after sharding", + }, []string{"user"}), + } + }) +} + +type shardQuery struct { + next Handler + shardNum int + logger log.Logger + // Metrics. + splitByCounter *prometheus.CounterVec +} + +// Do implements Handler +func (s shardQuery) Do(r *http.Request) (*http.Response, error) { + level.Info(s.logger).Log("msg", "shardQuery called") + + userID, err := user.ExtractOrgID(r.Context()) + if err != nil { + return nil, err + } + + // create sharded queries + boundaryBytes := make([][]byte, s.shardNum+1) + for i := 0; i < s.shardNum+1; i ++ { + boundaryBytes[i] = make([]byte, 0) + } + const MaxUint = ^uint64(0) + const MaxInt = int64(MaxUint >> 1) + for i := 0; i < s.shardNum; i++ { + binary.PutVarint(boundaryBytes[i], MaxInt*(int64(i))/int64(s.shardNum)) + binary.PutVarint(boundaryBytes[i], 0) + } + binary.PutVarint(boundaryBytes[s.shardNum], MaxInt) + binary.PutVarint(boundaryBytes[s.shardNum], MaxInt) + + reqs := make([]*http.Request, s.shardNum) + for i := 0; i < s.shardNum; i++ { + reqs[i] = r + reqs[i].URL.Query().Add("blockStart", string(boundaryBytes[i])) + reqs[i].URL.Query().Add("blockEnd", string(boundaryBytes[i+1])) + } + s.splitByCounter.WithLabelValues(userID).Add(float64(s.shardNum)) + + rrs, err := DoRequests(r.Context(), s.next, reqs) + if err != nil { + return nil, err + } + + // todo: add merging logic here if there are more than one results + for _, rr := range rrs { + if rr.Response.StatusCode == http.StatusOK { + return rr.Response, nil + } + } + + return nil, fmt.Errorf("trace not found") +} + +// RequestResponse contains a request response and the respective request that was used. +type RequestResponse struct { + Request *http.Request + Response *http.Response +} + +// DoRequests executes a list of requests in parallel. The limits parameters is used to limit parallelism per single request. +func DoRequests(ctx context.Context, downstream Handler, reqs []*http.Request) ([]RequestResponse, error) { + // If one of the requests fail, we want to be able to cancel the rest of them. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Feed all requests to a bounded intermediate channel to limit parallelism. + intermediate := make(chan *http.Request) + go func() { + for _, req := range reqs { + intermediate <- req + } + close(intermediate) + }() + + respChan, errChan := make(chan RequestResponse), make(chan error) + // todo: make this configurable using limits + parallelism := 10 + if parallelism > len(reqs) { + parallelism = len(reqs) + } + for i := 0; i < parallelism; i++ { + go func() { + for req := range intermediate { + resp, err := downstream.Do(req) + if err != nil { + errChan <- err + } else { + respChan <- RequestResponse{req, resp} + } + } + }() + } + + resps := make([]RequestResponse, 0, len(reqs)) + var firstErr error + for range reqs { + select { + case resp := <-respChan: + resps = append(resps, resp) + case err := <-errChan: + if firstErr == nil { + cancel() + firstErr = err + } + } + } + + return resps, firstErr +} diff --git a/modules/querier/config.go b/modules/querier/config.go index b505b284e87..b8ebd56ec9a 100644 --- a/modules/querier/config.go +++ b/modules/querier/config.go @@ -9,10 +9,12 @@ import ( type Config struct { QueryTimeout time.Duration `yaml:"query_timeout"` ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` + MaxConcurrent int `yaml:"max_concurrent"` } // RegisterFlagsAndApplyDefaults register flags. func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { cfg.QueryTimeout = 10 * time.Second cfg.ExtraQueryDelay = 0 + cfg.MaxConcurrent = 20 } diff --git a/modules/querier/querier.go b/modules/querier/querier.go index fb6a118adb2..a4658704199 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -55,6 +55,7 @@ type Querier struct { store storage.Store limits *overrides.Overrides + subservices *services.Manager subservicesWatcher *services.FailureWatcher } @@ -82,34 +83,49 @@ func New(cfg Config, clientCfg ingester_client.Config, ring ring.ReadRing, store limits: limits, } - q.subservicesWatcher = services.NewFailureWatcher() - q.subservicesWatcher.WatchService(q.pool) - q.Service = services.NewBasicService(q.starting, q.running, q.stopping) return q, nil } +// todo: fix name, its confusing +func (q *Querier) RegisterSubservices(w services.Service) error { + var err error + q.subservices, err = services.NewManager(w, q.pool) + q.subservicesWatcher = services.NewFailureWatcher() + q.subservicesWatcher.WatchManager(q.subservices) + return err +} + func (q *Querier) starting(ctx context.Context) error { - err := services.StartAndAwaitRunning(ctx, q.pool) - if err != nil { - return fmt.Errorf("failed to start pool %w", err) + if q.subservices != nil { + err := services.StartManagerAndAwaitHealthy(ctx, q.subservices) + if err != nil { + return fmt.Errorf("failed to start subservices %w", err) + } } - return nil } func (q *Querier) running(ctx context.Context) error { - select { - case <-ctx.Done(): - return nil - case err := <-q.subservicesWatcher.Chan(): - return fmt.Errorf("querier subservices failed %w", err) + if q.subservices != nil { + select { + case <-ctx.Done(): + return nil + case err := <-q.subservicesWatcher.Chan(): + return fmt.Errorf("subservices failed %w", err) + } + } else { + <-ctx.Done() } + return nil } // Called after distributor is asked to stop via StopAsync. func (q *Querier) stopping(_ error) error { - return services.StopAndAwaitTerminated(context.Background(), q.pool) + if q.subservices != nil { + return services.StopManagerAndAwaitStopped(context.Background(), q.subservices) + } + return nil } // FindTraceByID implements tempopb.Querier. diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index d1fa4033c39..1943d610b0e 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -300,8 +300,6 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id encoding.I meta := payload.(*encoding.BlockMeta) shardKey := bloom.ShardKeyForTraceID(id) - span.SetTag("blockID", meta.BlockID) - span.SetTag("shardKey", shardKey) bloomBytes, err := rw.r.Bloom(ctx, meta.BlockID, tenantID, shardKey) if err != nil { return nil, fmt.Errorf("error retrieving bloom %v", err) @@ -358,14 +356,13 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id encoding.I break } } - level.Info(logger).Log("msg", "searching for trace in block", "findTraceID", hex.EncodeToString(id), "block", meta.BlockID, "found", foundObject != nil) - span.LogFields(ot_log.String("msg", "complete"), - ot_log.String("findTraceID", hex.EncodeToString(id)), + span.LogFields(ot_log.String("blockID", meta.BlockID.String()), ot_log.Bool("found", foundObject != nil), ot_log.Int("bytes", len(foundObject))) return foundObject, nil }) + level.Info(logger).Log("msg", "search for trace complete", "findTraceID", hex.EncodeToString(id), "found", len(foundBytes) > 0) return foundBytes, metrics, err } From 39f7df09cfbc1e98d3f371070f88175103b65aba Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 9 Dec 2020 18:11:04 +0530 Subject: [PATCH 2/9] Add shardsplit test, move config to separate file Signed-off-by: Annanay --- cmd/tempo/app/app.go | 2 +- cmd/tempo/app/modules.go | 4 +- modules/frontend/config.go | 21 +++++++ modules/frontend/frontend.go | 94 +++++++++++++++++-------------- modules/frontend/frontend_test.go | 44 +++++++++++++++ 5 files changed, 121 insertions(+), 44 deletions(-) create mode 100644 modules/frontend/config.go create mode 100644 modules/frontend/frontend_test.go diff --git a/cmd/tempo/app/app.go b/cmd/tempo/app/app.go index 7bb053e6a06..272281e0b12 100644 --- a/cmd/tempo/app/app.go +++ b/cmd/tempo/app/app.go @@ -82,7 +82,7 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { c.Distributor.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "distributor"), f) c.Ingester.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "ingester"), f) c.Querier.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "querier"), f) - c.Frontend.ApplyDefaults() + c.Frontend.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "frontend"), f) // todo: add worker defaults c.Compactor.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "compactor"), f) c.StorageConfig.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "storage"), f) diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 955935a7d98..f648e438425 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -138,7 +138,7 @@ func (t *App) initQuerier() (services.Service, error) { tracesHandler := middleware.Merge( t.httpAuthMiddleware, ).Wrap(http.HandlerFunc(t.querier.TraceByIDHandler)) - t.server.HTTP.Handle("/api/traces/{traceID}", tracesHandler) + t.server.HTTP.Handle("/querier/api/traces/{traceID}", tracesHandler) worker, err := cortex_frontend.NewWorker( t.cfg.Worker, @@ -177,7 +177,7 @@ func (t *App) initQueryFrontend() (services.Service, error) { cortex_frontend.RegisterFrontendServer(t.server.GRPC, t.frontend) // register at a different endpoint for now - t.server.HTTP.Handle("/api/traces/frontend/{traceID}", t.frontend.Handler()) + t.server.HTTP.Handle("/api/traces/{traceID}", t.frontend.Handler()) return nil, nil } diff --git a/modules/frontend/config.go b/modules/frontend/config.go new file mode 100644 index 00000000000..30f82459d58 --- /dev/null +++ b/modules/frontend/config.go @@ -0,0 +1,21 @@ +package frontend + +import ( + "flag" + + "github.com/cortexproject/cortex/pkg/querier/frontend" +) + +type FrontendConfig struct { + frontend.Config `yaml:",inline"` + ShardNum int `yaml:"shard_num,omitempty"` +} + +func (cfg *FrontendConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { + cfg.Config.CompressResponses = false + cfg.Config.DownstreamURL = "" + cfg.Config.LogQueriesLongerThan = 0 + cfg.Config.MaxOutstandingPerTenant = 100 + cfg.ShardNum = 4 +} + diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 037f63d3fec..0a8e4aca29d 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -3,7 +3,10 @@ package frontend import ( "context" "encoding/binary" + "encoding/hex" "fmt" + "net/http" + "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log" @@ -11,21 +14,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/user" - "net/http" -) - -type FrontendConfig struct { - frontend.Config `yaml:",inline"` - ShardNum int `yaml:"shard_num,omitempty"` -} -func (cfg *FrontendConfig) ApplyDefaults() { - cfg.Config.CompressResponses = false - cfg.Config.DownstreamURL = "" - cfg.Config.LogQueriesLongerThan = 0 - cfg.Config.MaxOutstandingPerTenant = 100 - cfg.ShardNum = 4 -} + "github.com/grafana/tempo/modules/querier" +) // NewTripperware returns a Tripperware configured with a middleware to split requests func NewTripperware(cfg FrontendConfig, logger log.Logger, registerer prometheus.Registerer) (frontend.Tripperware, error) { @@ -33,8 +24,8 @@ func NewTripperware(cfg FrontendConfig, logger log.Logger, registerer prometheus queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "query_frontend_queries_total", - Help: "Total queries sent per tenant.", - }, []string{"user"}) + Help: "Total queries received per tenant.", + }, []string{"tenant"}) return func(next http.RoundTripper) http.RoundTripper { // get the http request, add some custom parameters to it, split it, and call downstream roundtripper @@ -103,22 +94,24 @@ func (q roundTripper) Do(r *http.Request) (*http.Response, error) { func ShardingWare(shardNum int, logger log.Logger, registerer prometheus.Registerer) Middleware { return MiddlewareFunc(func(next Handler) Handler { return shardQuery{ - next: next, - shardNum: shardNum, - logger: logger, + next: next, + queryShards: shardNum, + logger: logger, splitByCounter: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "query_frontend_split_queries_total", - Help: "Total number of underlying query requests after sharding", - }, []string{"user"}), + Help: "Total number of underlying query requests after sharding.", + }, []string{"tenant"}), } }) } type shardQuery struct { - next Handler - shardNum int - logger log.Logger + next Handler + queryShards int + logger log.Logger + blockBoundaries [][]byte + // Metrics. splitByCounter *prometheus.CounterVec } @@ -132,27 +125,24 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { return nil, err } - // create sharded queries - boundaryBytes := make([][]byte, s.shardNum+1) - for i := 0; i < s.shardNum+1; i ++ { - boundaryBytes[i] = make([]byte, 0) - } - const MaxUint = ^uint64(0) - const MaxInt = int64(MaxUint >> 1) - for i := 0; i < s.shardNum; i++ { - binary.PutVarint(boundaryBytes[i], MaxInt*(int64(i))/int64(s.shardNum)) - binary.PutVarint(boundaryBytes[i], 0) + // only need to initialise boundaries once + if len(s.blockBoundaries) == 0 { + s.blockBoundaries = createBlockShards(s.queryShards) } - binary.PutVarint(boundaryBytes[s.shardNum], MaxInt) - binary.PutVarint(boundaryBytes[s.shardNum], MaxInt) - reqs := make([]*http.Request, s.shardNum) - for i := 0; i < s.shardNum; i++ { + reqs := make([]*http.Request, s.queryShards) + for i := 0; i < s.queryShards; i++ { reqs[i] = r - reqs[i].URL.Query().Add("blockStart", string(boundaryBytes[i])) - reqs[i].URL.Query().Add("blockEnd", string(boundaryBytes[i+1])) + reqs[i].URL.Query().Add(querier.BlockStartKey, hex.EncodeToString(s.blockBoundaries[i])) + reqs[i].URL.Query().Add(querier.BlockEndKey, hex.EncodeToString(s.blockBoundaries[i+1])) + + if i==0 { + reqs[i].URL.Query().Add(querier.QueryIngestersKey, "true") + } else { + reqs[i].URL.Query().Add(querier.QueryIngestersKey, "false") + } } - s.splitByCounter.WithLabelValues(userID).Add(float64(s.shardNum)) + s.splitByCounter.WithLabelValues(userID).Add(float64(s.queryShards)) rrs, err := DoRequests(r.Context(), s.next, reqs) if err != nil { @@ -169,6 +159,28 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { return nil, fmt.Errorf("trace not found") } +func createBlockShards(queryShards int) [][]byte { + if queryShards == 0 { + return nil + } + + // create sharded queries + boundaryBytes := make([][]byte, queryShards+1) + for i := 0; i < queryShards+1; i ++ { + boundaryBytes[i] = make([]byte, 16) + } + const MaxUint = uint64(^uint8(0)) + for i := 0; i < queryShards; i++ { + binary.LittleEndian.PutUint64(boundaryBytes[i][:8], (MaxUint/uint64(queryShards))*uint64(i)) + binary.LittleEndian.PutUint64(boundaryBytes[i][8:], 0) + } + const MaxUint64 = ^uint64(0) + binary.LittleEndian.PutUint64(boundaryBytes[queryShards][:8], MaxUint64) + binary.LittleEndian.PutUint64(boundaryBytes[queryShards][8:], MaxUint64) + + return boundaryBytes +} + // RequestResponse contains a request response and the respective request that was used. type RequestResponse struct { Request *http.Request diff --git a/modules/frontend/frontend_test.go b/modules/frontend/frontend_test.go new file mode 100644 index 00000000000..8af1266413a --- /dev/null +++ b/modules/frontend/frontend_test.go @@ -0,0 +1,44 @@ +package frontend + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestCreateBlockShards(t *testing.T) { + tests := []struct { + name string + queryShards int + expected [][]byte + }{ + { + name: "single shard", + queryShards: 1, + expected: [][]byte{ + {0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + }, + { + name: "multiple shards", + queryShards: 4, + expected: [][]byte{ + {0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // 0 + {0x3f, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // 0x3f = 255/4 * 1 + {0x7e, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // 0x7e = 255/4 * 2 + {0xbd, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // 0xbd = 255/4 * 3 + {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bb := createBlockShards(tt.queryShards) + assert.Len(t, bb, len(tt.expected)) + + for i := 0; i < len(bb); i++ { + assert.Equal(t, tt.expected[i], bb[i]) + } + }) + } +} From 27e31794bcfaae272772e18af00f3165a525a52c Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 10 Dec 2020 20:36:45 +0530 Subject: [PATCH 3/9] Checkpoint: Frontend integrated Signed-off-by: Annanay --- cmd/tempo/app/app.go | 2 -- cmd/tempo/app/modules.go | 37 +++++++++++------------------ modules/frontend/config.go | 4 ++-- modules/frontend/frontend.go | 46 ++++++++++++++++-------------------- modules/querier/config.go | 19 +++++++++++++-- modules/querier/http.go | 14 +++++++---- modules/querier/querier.go | 25 +++++++++++++++++--- tempodb/tempodb.go | 6 +++-- 8 files changed, 90 insertions(+), 63 deletions(-) diff --git a/cmd/tempo/app/app.go b/cmd/tempo/app/app.go index 272281e0b12..666f5115f80 100644 --- a/cmd/tempo/app/app.go +++ b/cmd/tempo/app/app.go @@ -47,7 +47,6 @@ type Config struct { IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` Querier querier.Config `yaml:"querier,omitempty"` Frontend frontend.FrontendConfig `yaml:"frontend,omitempty"` - Worker cortex_frontend.WorkerConfig `yaml:"frontend_worker"` Compactor compactor.Config `yaml:"compactor,omitempty"` Ingester ingester.Config `yaml:"ingester,omitempty"` StorageConfig storage.Config `yaml:"storage,omitempty"` @@ -83,7 +82,6 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { c.Ingester.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "ingester"), f) c.Querier.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "querier"), f) c.Frontend.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "frontend"), f) - // todo: add worker defaults c.Compactor.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "compactor"), f) c.StorageConfig.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "storage"), f) diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index f648e438425..c50b8da1472 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -5,10 +5,9 @@ import ( "net/http" "os" + "github.com/go-kit/kit/log/level" "github.com/cortexproject/cortex/pkg/cortex" - cortex_querier "github.com/cortexproject/cortex/pkg/querier" cortex_frontend "github.com/cortexproject/cortex/pkg/querier/frontend" - httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" @@ -128,6 +127,18 @@ func (t *App) initIngester() (services.Service, error) { } func (t *App) initQuerier() (services.Service, error) { + // validate worker config + // if we're not in single binary mode and worker address is not specified - bail + if t.cfg.Target != All && t.cfg.Querier.Worker.Address == "" { + return nil, fmt.Errorf("frontend worker address not specified") + } else if t.cfg.Target == All { + // if we're in single binary mode with no worker address specified, register default endpoint + if t.cfg.Querier.Worker.Address == "" { + t.cfg.Querier.Worker.Address = fmt.Sprintf("127.0.0.1:%d", t.cfg.Server.GRPCListenPort) + level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", t.cfg.Querier.Worker.Address) + } + } + // todo: make ingester client a module instead of passing config everywhere querier, err := querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store, t.overrides) if err != nil { @@ -138,26 +149,7 @@ func (t *App) initQuerier() (services.Service, error) { tracesHandler := middleware.Merge( t.httpAuthMiddleware, ).Wrap(http.HandlerFunc(t.querier.TraceByIDHandler)) - t.server.HTTP.Handle("/querier/api/traces/{traceID}", tracesHandler) - - worker, err := cortex_frontend.NewWorker( - t.cfg.Worker, - cortex_querier.Config{ - MaxConcurrent: t.cfg.Querier.MaxConcurrent, - }, - httpgrpc_server.NewServer(tracesHandler), - util.Logger, - ) - if err != nil { - return nil, fmt.Errorf("failed to create frontend worker %w", err) - } - - err = querier.RegisterSubservices(worker) - if err != nil { - return nil, fmt.Errorf("failed to register frontend worker %w", err) - } - - return querier, nil + return t.querier, t.querier.CreateAndRegisterWorker(tracesHandler) } func (t *App) initQueryFrontend() (services.Service, error) { @@ -176,7 +168,6 @@ func (t *App) initQueryFrontend() (services.Service, error) { t.frontend.Wrap(tripperware) cortex_frontend.RegisterFrontendServer(t.server.GRPC, t.frontend) - // register at a different endpoint for now t.server.HTTP.Handle("/api/traces/{traceID}", t.frontend.Handler()) return nil, nil diff --git a/modules/frontend/config.go b/modules/frontend/config.go index 30f82459d58..e1c02be9063 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -8,7 +8,7 @@ import ( type FrontendConfig struct { frontend.Config `yaml:",inline"` - ShardNum int `yaml:"shard_num,omitempty"` + QueryShards int `yaml:"query_shards,omitempty"` } func (cfg *FrontendConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { @@ -16,6 +16,6 @@ func (cfg *FrontendConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag. cfg.Config.DownstreamURL = "" cfg.Config.LogQueriesLongerThan = 0 cfg.Config.MaxOutstandingPerTenant = 100 - cfg.ShardNum = 4 + cfg.QueryShards = 4 } diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 0a8e4aca29d..fd4f902d050 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -5,17 +5,14 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "net/http" - "github.com/cortexproject/cortex/pkg/querier/frontend" - "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/grafana/tempo/modules/querier" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/user" - - "github.com/grafana/tempo/modules/querier" + "net/http" ) // NewTripperware returns a Tripperware configured with a middleware to split requests @@ -29,9 +26,8 @@ func NewTripperware(cfg FrontendConfig, logger log.Logger, registerer prometheus return func(next http.RoundTripper) http.RoundTripper { // get the http request, add some custom parameters to it, split it, and call downstream roundtripper - rt := NewRoundTripper(next, ShardingWare(cfg.ShardNum, logger, registerer)) + rt := NewRoundTripper(next, ShardingWare(cfg.QueryShards, logger, registerer)) return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { - level.Info(util.Logger).Log("msg", "request received by custom tripperware") orgID := r.Header.Get(user.OrgIDHeaderName) queriesPerTenant.WithLabelValues(orgID).Inc() @@ -81,21 +77,19 @@ func NewRoundTripper(next http.RoundTripper, middlewares ...Middleware) http.Rou } func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - level.Info(util.Logger).Log("blockStart", r.URL.Query().Get("blockStart"), "blockEnd", r.URL.Query().Get("blockEnd")) return q.handler.Do(r) } // Do implements Handler. func (q roundTripper) Do(r *http.Request) (*http.Response, error) { - level.Info(util.Logger).Log("msg", "roundTripper.Do called") return q.next.RoundTrip(r) } -func ShardingWare(shardNum int, logger log.Logger, registerer prometheus.Registerer) Middleware { +func ShardingWare(queryShards int, logger log.Logger, registerer prometheus.Registerer) Middleware { return MiddlewareFunc(func(next Handler) Handler { return shardQuery{ next: next, - queryShards: shardNum, + queryShards: queryShards, logger: logger, splitByCounter: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", @@ -118,8 +112,6 @@ type shardQuery struct { // Do implements Handler func (s shardQuery) Do(r *http.Request) (*http.Response, error) { - level.Info(s.logger).Log("msg", "shardQuery called") - userID, err := user.ExtractOrgID(r.Context()) if err != nil { return nil, err @@ -132,15 +124,19 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { reqs := make([]*http.Request, s.queryShards) for i := 0; i < s.queryShards; i++ { - reqs[i] = r - reqs[i].URL.Query().Add(querier.BlockStartKey, hex.EncodeToString(s.blockBoundaries[i])) - reqs[i].URL.Query().Add(querier.BlockEndKey, hex.EncodeToString(s.blockBoundaries[i+1])) + reqs[i] = r.Clone(r.Context()) + q := reqs[i].URL.Query() + q.Add(querier.BlockStartKey, hex.EncodeToString(s.blockBoundaries[i])) + q.Add(querier.BlockEndKey, hex.EncodeToString(s.blockBoundaries[i+1])) if i==0 { - reqs[i].URL.Query().Add(querier.QueryIngestersKey, "true") + q.Add(querier.QueryIngestersKey, "true") } else { - reqs[i].URL.Query().Add(querier.QueryIngestersKey, "false") + q.Add(querier.QueryIngestersKey, "false") } + + // adding to RequestURI ONLY because weaveworks/common uses the RequestURI field to translate from + reqs[i].RequestURI = reqs[i].URL.RequestURI() + "?" + q.Encode() } s.splitByCounter.WithLabelValues(userID).Add(float64(s.queryShards)) @@ -165,20 +161,20 @@ func createBlockShards(queryShards int) [][]byte { } // create sharded queries - boundaryBytes := make([][]byte, queryShards+1) + blockBoundaries := make([][]byte, queryShards+1) for i := 0; i < queryShards+1; i ++ { - boundaryBytes[i] = make([]byte, 16) + blockBoundaries[i] = make([]byte, 16) } const MaxUint = uint64(^uint8(0)) for i := 0; i < queryShards; i++ { - binary.LittleEndian.PutUint64(boundaryBytes[i][:8], (MaxUint/uint64(queryShards))*uint64(i)) - binary.LittleEndian.PutUint64(boundaryBytes[i][8:], 0) + binary.LittleEndian.PutUint64(blockBoundaries[i][:8], (MaxUint/uint64(queryShards))*uint64(i)) + binary.LittleEndian.PutUint64(blockBoundaries[i][8:], 0) } const MaxUint64 = ^uint64(0) - binary.LittleEndian.PutUint64(boundaryBytes[queryShards][:8], MaxUint64) - binary.LittleEndian.PutUint64(boundaryBytes[queryShards][8:], MaxUint64) + binary.LittleEndian.PutUint64(blockBoundaries[queryShards][:8], MaxUint64) + binary.LittleEndian.PutUint64(blockBoundaries[queryShards][8:], MaxUint64) - return boundaryBytes + return blockBoundaries } // RequestResponse contains a request response and the respective request that was used. diff --git a/modules/querier/config.go b/modules/querier/config.go index b8ebd56ec9a..2e8a629a1ad 100644 --- a/modules/querier/config.go +++ b/modules/querier/config.go @@ -3,18 +3,33 @@ package querier import ( "flag" "time" + + cortex_frontend "github.com/cortexproject/cortex/pkg/querier/frontend" + "github.com/cortexproject/cortex/pkg/util/grpcclient" ) // Config for a querier. type Config struct { QueryTimeout time.Duration `yaml:"query_timeout"` ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` - MaxConcurrent int `yaml:"max_concurrent"` + MaxConcurrentQueries int `yaml:"max_concurrent_queries"` + Worker cortex_frontend.WorkerConfig `yaml:"frontend_worker"` } // RegisterFlagsAndApplyDefaults register flags. func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { cfg.QueryTimeout = 10 * time.Second cfg.ExtraQueryDelay = 0 - cfg.MaxConcurrent = 20 + cfg.MaxConcurrentQueries = 5 + cfg.Worker = cortex_frontend.WorkerConfig{ + Parallelism: 2, + GRPCClientConfig: grpcclient.ConfigWithTLS{ + GRPC: grpcclient.Config{ + MaxRecvMsgSize: 100<<20, + MaxSendMsgSize: 16<<20, + UseGzipCompression: false, + GRPCCompression: "", + }, + }, + } } diff --git a/modules/querier/http.go b/modules/querier/http.go index 7ed087b6396..5f5421f4fe0 100644 --- a/modules/querier/http.go +++ b/modules/querier/http.go @@ -4,12 +4,12 @@ import ( "context" "fmt" "net/http" + "strings" "time" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/google/uuid" - "github.com/gorilla/mux" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/tempodb" @@ -34,9 +34,15 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) { span, ctx := opentracing.StartSpanFromContext(ctx, "Querier.TraceByIDHandler") defer span.Finish() - vars := mux.Vars(r) - traceID, ok := vars[TraceIDVar] - if !ok { + var traceID string + splits := strings.Split(r.RequestURI, "?") + if len(splits) == 0 { + traceID = strings.TrimPrefix(r.RequestURI, "/api/traces/") + } else { + traceID = strings.TrimPrefix(splits[0], "/api/traces/") + } + + if len(traceID) == 0 { http.Error(w, "please provide a traceID", http.StatusBadRequest) return } diff --git a/modules/querier/querier.go b/modules/querier/querier.go index a4658704199..ca82980ba32 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -3,6 +3,7 @@ package querier import ( "context" "fmt" + "net/http" "github.com/gogo/protobuf/proto" "github.com/opentracing/opentracing-go" @@ -12,10 +13,13 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/user" + cortex_querier "github.com/cortexproject/cortex/pkg/querier" + cortex_frontend "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/ring" ring_client "github.com/cortexproject/cortex/pkg/ring/client" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" ingester_client "github.com/grafana/tempo/modules/ingester/client" "github.com/grafana/tempo/modules/overrides" @@ -87,10 +91,25 @@ func New(cfg Config, clientCfg ingester_client.Config, ring ring.ReadRing, store return q, nil } -// todo: fix name, its confusing -func (q *Querier) RegisterSubservices(w services.Service) error { +func (q *Querier) CreateAndRegisterWorker(tracesHandler http.Handler) error { + worker, err := cortex_frontend.NewWorker( + q.cfg.Worker, + cortex_querier.Config{ + MaxConcurrent: q.cfg.MaxConcurrentQueries, + }, + httpgrpc_server.NewServer(tracesHandler), + util.Logger, + ) + if err != nil { + return fmt.Errorf("failed to create frontend worker: %w", err) + } + + return q.RegisterSubservices(worker, q.pool) +} + +func (q *Querier) RegisterSubservices(s ...services.Service) error { var err error - q.subservices, err = services.NewManager(w, q.pool) + q.subservices, err = services.NewManager(s...) q.subservicesWatcher = services.NewFailureWatcher() q.subservicesWatcher.WatchManager(q.subservices) return err diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 1943d610b0e..5d80e02f1f3 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -356,13 +356,15 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id encoding.I break } } - span.LogFields(ot_log.String("blockID", meta.BlockID.String()), + level.Info(logger).Log("msg", "searching for trace in block", "findTraceID", hex.EncodeToString(id), "block", meta.BlockID, "found", foundObject != nil) + span.LogFields( + ot_log.String("msg", "searching for trace in block"), + ot_log.String("blockID", meta.BlockID.String()), ot_log.Bool("found", foundObject != nil), ot_log.Int("bytes", len(foundObject))) return foundObject, nil }) - level.Info(logger).Log("msg", "search for trace complete", "findTraceID", hex.EncodeToString(id), "found", len(foundBytes) > 0) return foundBytes, metrics, err } From 3d8930bf8208bf23338a46b8818f438712f3e993 Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 11 Dec 2020 15:59:58 +0530 Subject: [PATCH 4/9] Checkpoint: Pass req through mux Signed-off-by: Annanay --- cmd/tempo/app/modules.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index c50b8da1472..880ed346a1e 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -149,7 +149,10 @@ func (t *App) initQuerier() (services.Service, error) { tracesHandler := middleware.Merge( t.httpAuthMiddleware, ).Wrap(http.HandlerFunc(t.querier.TraceByIDHandler)) - return t.querier, t.querier.CreateAndRegisterWorker(tracesHandler) + + // todo: figure this out + t.server.HTTP.Handle("/querier/api/traces/{traceID}", tracesHandler) + return t.querier, t.querier.CreateAndRegisterWorker(t.server.HTTP) } func (t *App) initQueryFrontend() (services.Service, error) { From ec432be5e41d8fc0a558b4d9b5a2198f9042e60a Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 11 Dec 2020 18:38:49 +0530 Subject: [PATCH 5/9] Checkpoint: Passing through mux works Signed-off-by: Annanay --- modules/frontend/config.go | 2 +- modules/frontend/frontend.go | 2 +- modules/querier/http.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/frontend/config.go b/modules/frontend/config.go index e1c02be9063..6d35ba6107e 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -8,7 +8,7 @@ import ( type FrontendConfig struct { frontend.Config `yaml:",inline"` - QueryShards int `yaml:"query_shards,omitempty"` + QueryShards int `yaml:"query_shards,omitempty"` } func (cfg *FrontendConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index fd4f902d050..f617ae1511e 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -136,7 +136,7 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { } // adding to RequestURI ONLY because weaveworks/common uses the RequestURI field to translate from - reqs[i].RequestURI = reqs[i].URL.RequestURI() + "?" + q.Encode() + reqs[i].RequestURI = "/querier" + reqs[i].URL.RequestURI() + "?" + q.Encode() } s.splitByCounter.WithLabelValues(userID).Add(float64(s.queryShards)) diff --git a/modules/querier/http.go b/modules/querier/http.go index 5f5421f4fe0..74bc95fac60 100644 --- a/modules/querier/http.go +++ b/modules/querier/http.go @@ -37,9 +37,9 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) { var traceID string splits := strings.Split(r.RequestURI, "?") if len(splits) == 0 { - traceID = strings.TrimPrefix(r.RequestURI, "/api/traces/") + traceID = strings.TrimPrefix(r.RequestURI, "/querier/api/traces/") } else { - traceID = strings.TrimPrefix(splits[0], "/api/traces/") + traceID = strings.TrimPrefix(splits[0], "/querier/api/traces/") } if len(traceID) == 0 { From b95971de7cf9557fd8869216e2e2da085aa531f4 Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 11 Dec 2020 21:11:23 +0530 Subject: [PATCH 6/9] Checkpoint: Fix auth middleware Signed-off-by: Annanay --- cmd/tempo/app/modules.go | 12 +++++++++--- modules/frontend/frontend.go | 25 +++++++++++-------------- modules/querier/http.go | 14 ++++---------- 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 880ed346a1e..663637346cf 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -150,7 +150,6 @@ func (t *App) initQuerier() (services.Service, error) { t.httpAuthMiddleware, ).Wrap(http.HandlerFunc(t.querier.TraceByIDHandler)) - // todo: figure this out t.server.HTTP.Handle("/querier/api/traces/{traceID}", tracesHandler) return t.querier, t.querier.CreateAndRegisterWorker(t.server.HTTP) } @@ -170,10 +169,17 @@ func (t *App) initQueryFrontend() (services.Service, error) { // tripperware will be called before f.roundTripper (which calls roundtripgrpc) t.frontend.Wrap(tripperware) + tracesHandler := middleware.Merge( + t.httpAuthMiddleware, + ).Wrap(t.frontend.Handler()) + cortex_frontend.RegisterFrontendServer(t.server.GRPC, t.frontend) - t.server.HTTP.Handle("/api/traces/{traceID}", t.frontend.Handler()) + t.server.HTTP.Handle("/api/traces/{traceID}", tracesHandler) - return nil, nil + return services.NewIdleService(nil, func(_ error) error { + t.frontend.Close() + return nil + }), nil } func (t *App) initCompactor() (services.Service, error) { diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index f617ae1511e..0346f567b22 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -5,6 +5,8 @@ import ( "encoding/binary" "encoding/hex" "fmt" + "net/http" + "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -12,7 +14,11 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/user" - "net/http" +) + +const ( + querierPrefix = "/querier" + queryDelimiter = "?" ) // NewTripperware returns a Tripperware configured with a middleware to split requests @@ -28,10 +34,8 @@ func NewTripperware(cfg FrontendConfig, logger log.Logger, registerer prometheus // get the http request, add some custom parameters to it, split it, and call downstream roundtripper rt := NewRoundTripper(next, ShardingWare(cfg.QueryShards, logger, registerer)) return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { - orgID := r.Header.Get(user.OrgIDHeaderName) + orgID, _ := user.ExtractOrgID(r.Context()) queriesPerTenant.WithLabelValues(orgID).Inc() - - r = r.WithContext(user.InjectOrgID(r.Context(), orgID)) return rt.RoundTrip(r) }) }, nil @@ -91,11 +95,6 @@ func ShardingWare(queryShards int, logger log.Logger, registerer prometheus.Regi next: next, queryShards: queryShards, logger: logger, - splitByCounter: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Namespace: "tempo", - Name: "query_frontend_split_queries_total", - Help: "Total number of underlying query requests after sharding.", - }, []string{"tenant"}), } }) } @@ -105,9 +104,6 @@ type shardQuery struct { queryShards int logger log.Logger blockBoundaries [][]byte - - // Metrics. - splitByCounter *prometheus.CounterVec } // Do implements Handler @@ -135,10 +131,10 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { q.Add(querier.QueryIngestersKey, "false") } + reqs[i].Header.Set(user.OrgIDHeaderName, userID) // adding to RequestURI ONLY because weaveworks/common uses the RequestURI field to translate from - reqs[i].RequestURI = "/querier" + reqs[i].URL.RequestURI() + "?" + q.Encode() + reqs[i].RequestURI = querierPrefix + reqs[i].URL.RequestURI() + queryDelimiter + q.Encode() } - s.splitByCounter.WithLabelValues(userID).Add(float64(s.queryShards)) rrs, err := DoRequests(r.Context(), s.next, reqs) if err != nil { @@ -146,6 +142,7 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { } // todo: add merging logic here if there are more than one results + var statusError int for _, rr := range rrs { if rr.Response.StatusCode == http.StatusOK { return rr.Response, nil diff --git a/modules/querier/http.go b/modules/querier/http.go index 74bc95fac60..7ed087b6396 100644 --- a/modules/querier/http.go +++ b/modules/querier/http.go @@ -4,12 +4,12 @@ import ( "context" "fmt" "net/http" - "strings" "time" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/google/uuid" + "github.com/gorilla/mux" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/tempodb" @@ -34,15 +34,9 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) { span, ctx := opentracing.StartSpanFromContext(ctx, "Querier.TraceByIDHandler") defer span.Finish() - var traceID string - splits := strings.Split(r.RequestURI, "?") - if len(splits) == 0 { - traceID = strings.TrimPrefix(r.RequestURI, "/querier/api/traces/") - } else { - traceID = strings.TrimPrefix(splits[0], "/querier/api/traces/") - } - - if len(traceID) == 0 { + vars := mux.Vars(r) + traceID, ok := vars[TraceIDVar] + if !ok { http.Error(w, "please provide a traceID", http.StatusBadRequest) return } From 2d2262f964dca33480bd96fe6c6d8736100dd600 Mon Sep 17 00:00:00 2001 From: Annanay Date: Tue, 15 Dec 2020 20:26:37 +0530 Subject: [PATCH 7/9] Add instrumentation, lint Signed-off-by: Annanay --- cmd/tempo/app/app.go | 20 ++++++++++---------- cmd/tempo/app/modules.go | 2 +- modules/frontend/config.go | 3 +-- modules/frontend/frontend.go | 26 +++++++++++++++++++++----- modules/frontend/frontend_test.go | 10 +++++----- modules/querier/config.go | 16 ++++++++-------- modules/querier/querier.go | 2 +- 7 files changed, 47 insertions(+), 32 deletions(-) diff --git a/cmd/tempo/app/app.go b/cmd/tempo/app/app.go index 666f5115f80..9d4b81957a5 100644 --- a/cmd/tempo/app/app.go +++ b/cmd/tempo/app/app.go @@ -25,8 +25,8 @@ import ( "github.com/grafana/tempo/modules/compactor" "github.com/grafana/tempo/modules/distributor" - "github.com/grafana/tempo/modules/ingester" "github.com/grafana/tempo/modules/frontend" + "github.com/grafana/tempo/modules/ingester" ingester_client "github.com/grafana/tempo/modules/ingester/client" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/querier" @@ -42,16 +42,16 @@ type Config struct { AuthEnabled bool `yaml:"auth_enabled,omitempty"` HTTPPrefix string `yaml:"http_prefix"` - Server server.Config `yaml:"server,omitempty"` - Distributor distributor.Config `yaml:"distributor,omitempty"` - IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` - Querier querier.Config `yaml:"querier,omitempty"` + Server server.Config `yaml:"server,omitempty"` + Distributor distributor.Config `yaml:"distributor,omitempty"` + IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` + Querier querier.Config `yaml:"querier,omitempty"` Frontend frontend.FrontendConfig `yaml:"frontend,omitempty"` - Compactor compactor.Config `yaml:"compactor,omitempty"` - Ingester ingester.Config `yaml:"ingester,omitempty"` - StorageConfig storage.Config `yaml:"storage,omitempty"` - LimitsConfig overrides.Limits `yaml:"overrides,omitempty"` - MemberlistKV memberlist.KVConfig `yaml:"memberlist,omitempty"` + Compactor compactor.Config `yaml:"compactor,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` + StorageConfig storage.Config `yaml:"storage,omitempty"` + LimitsConfig overrides.Limits `yaml:"overrides,omitempty"` + MemberlistKV memberlist.KVConfig `yaml:"memberlist,omitempty"` } // RegisterFlagsAndApplyDefaults registers flag. diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 663637346cf..40d01a7c9d3 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -5,7 +5,6 @@ import ( "net/http" "os" - "github.com/go-kit/kit/log/level" "github.com/cortexproject/cortex/pkg/cortex" cortex_frontend "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/ring" @@ -14,6 +13,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/modules" "github.com/cortexproject/cortex/pkg/util/services" + "github.com/go-kit/kit/log/level" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/middleware" diff --git a/modules/frontend/config.go b/modules/frontend/config.go index 6d35ba6107e..d62cb0cd7ab 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -8,7 +8,7 @@ import ( type FrontendConfig struct { frontend.Config `yaml:",inline"` - QueryShards int `yaml:"query_shards,omitempty"` + QueryShards int `yaml:"query_shards,omitempty"` } func (cfg *FrontendConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { @@ -18,4 +18,3 @@ func (cfg *FrontendConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag. cfg.Config.MaxOutstandingPerTenant = 100 cfg.QueryShards = 4 } - diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 0346f567b22..d25ed06179f 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -5,15 +5,18 @@ import ( "encoding/binary" "encoding/hex" "fmt" + "io/ioutil" "net/http" + "github.com/opentracing/opentracing-go" "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/grafana/tempo/modules/querier" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/user" + + "github.com/grafana/tempo/modules/querier" ) const ( @@ -31,11 +34,17 @@ func NewTripperware(cfg FrontendConfig, logger log.Logger, registerer prometheus }, []string{"tenant"}) return func(next http.RoundTripper) http.RoundTripper { - // get the http request, add some custom parameters to it, split it, and call downstream roundtripper + // Get the http request, add custom parameters to it, split it, and call downstream roundtripper rt := NewRoundTripper(next, ShardingWare(cfg.QueryShards, logger, registerer)) return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { orgID, _ := user.ExtractOrgID(r.Context()) queriesPerTenant.WithLabelValues(orgID).Inc() + + // tracing instrumentation + span, ctx := opentracing.StartSpanFromContext(r.Context(), "frontend.RoundTrip") + defer span.Finish() + + r = r.WithContext(ctx) return rt.RoundTrip(r) }) }, nil @@ -142,16 +151,23 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { } // todo: add merging logic here if there are more than one results - var statusError int + var errCode int + var errBody []byte for _, rr := range rrs { if rr.Response.StatusCode == http.StatusOK { return rr.Response, nil } + defer rr.Response.Body.Close() + if rr.Response.StatusCode > errCode { + errCode = rr.Response.StatusCode + errBody, _ = ioutil.ReadAll(rr.Response.Body) + } } - return nil, fmt.Errorf("trace not found") + return nil, fmt.Errorf("%s", errBody) } +// createBlockShards splits the blockrange into queryshard parts func createBlockShards(queryShards int) [][]byte { if queryShards == 0 { return nil @@ -180,7 +196,7 @@ type RequestResponse struct { Response *http.Response } -// DoRequests executes a list of requests in parallel. The limits parameters is used to limit parallelism per single request. +// DoRequests executes a list of requests in parallel. func DoRequests(ctx context.Context, downstream Handler, reqs []*http.Request) ([]RequestResponse, error) { // If one of the requests fail, we want to be able to cancel the rest of them. ctx, cancel := context.WithCancel(ctx) diff --git a/modules/frontend/frontend_test.go b/modules/frontend/frontend_test.go index 8af1266413a..3c653947727 100644 --- a/modules/frontend/frontend_test.go +++ b/modules/frontend/frontend_test.go @@ -7,12 +7,12 @@ import ( func TestCreateBlockShards(t *testing.T) { tests := []struct { - name string + name string queryShards int - expected [][]byte + expected [][]byte }{ { - name: "single shard", + name: "single shard", queryShards: 1, expected: [][]byte{ {0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, @@ -20,10 +20,10 @@ func TestCreateBlockShards(t *testing.T) { }, }, { - name: "multiple shards", + name: "multiple shards", queryShards: 4, expected: [][]byte{ - {0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // 0 + {0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // 0 {0x3f, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // 0x3f = 255/4 * 1 {0x7e, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // 0x7e = 255/4 * 2 {0xbd, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // 0xbd = 255/4 * 3 diff --git a/modules/querier/config.go b/modules/querier/config.go index 2e8a629a1ad..5204e7b49bf 100644 --- a/modules/querier/config.go +++ b/modules/querier/config.go @@ -10,10 +10,10 @@ import ( // Config for a querier. type Config struct { - QueryTimeout time.Duration `yaml:"query_timeout"` - ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` - MaxConcurrentQueries int `yaml:"max_concurrent_queries"` - Worker cortex_frontend.WorkerConfig `yaml:"frontend_worker"` + QueryTimeout time.Duration `yaml:"query_timeout"` + ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` + MaxConcurrentQueries int `yaml:"max_concurrent_queries"` + Worker cortex_frontend.WorkerConfig `yaml:"frontend_worker"` } // RegisterFlagsAndApplyDefaults register flags. @@ -25,10 +25,10 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) Parallelism: 2, GRPCClientConfig: grpcclient.ConfigWithTLS{ GRPC: grpcclient.Config{ - MaxRecvMsgSize: 100<<20, - MaxSendMsgSize: 16<<20, - UseGzipCompression: false, - GRPCCompression: "", + MaxRecvMsgSize: 100 << 20, + MaxSendMsgSize: 16 << 20, + UseGzipCompression: false, + GRPCCompression: "gzip", }, }, } diff --git a/modules/querier/querier.go b/modules/querier/querier.go index ca82980ba32..e504623cfe3 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -59,7 +59,7 @@ type Querier struct { store storage.Store limits *overrides.Overrides - subservices *services.Manager + subservices *services.Manager subservicesWatcher *services.FailureWatcher } From 4659b05b2e56a1249219d2fa8c63113e20e0d99b Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 16 Dec 2020 16:27:55 +0530 Subject: [PATCH 8/9] Add integration test, lint, reorganize code, fix defaults Signed-off-by: Annanay --- CHANGELOG.md | 1 + cmd/tempo/app/app.go | 20 +-- cmd/tempo/app/modules.go | 38 ++--- integration/e2e/config-microservices.yaml | 6 +- integration/e2e/e2e_test.go | 11 +- integration/util.go | 16 ++ modules/frontend/config.go | 6 +- modules/frontend/frontend.go | 170 +--------------------- modules/frontend/frontend_test.go | 3 +- modules/frontend/querysharding.go | 164 +++++++++++++++++++++ 10 files changed, 232 insertions(+), 203 deletions(-) create mode 100644 modules/frontend/querysharding.go diff --git a/CHANGELOG.md b/CHANGELOG.md index dddc4f29de4..0e17c6de403 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * [CHANGE] Redo tempo-cli with basic command structure and improvements [#385](https://github.com/grafana/tempo/pull/385) * [CHANGE] Add content negotiation support and sharding parameters to Querier [#375](https://github.com/grafana/tempo/pull/375) +* [ENHANCEMENT] Add Query Frontend module to allow scaling the query path [#400](https://github.com/grafana/tempo/pull/400) ## v0.4.0 diff --git a/cmd/tempo/app/app.go b/cmd/tempo/app/app.go index 9d4b81957a5..e18675967c8 100644 --- a/cmd/tempo/app/app.go +++ b/cmd/tempo/app/app.go @@ -42,16 +42,16 @@ type Config struct { AuthEnabled bool `yaml:"auth_enabled,omitempty"` HTTPPrefix string `yaml:"http_prefix"` - Server server.Config `yaml:"server,omitempty"` - Distributor distributor.Config `yaml:"distributor,omitempty"` - IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` - Querier querier.Config `yaml:"querier,omitempty"` - Frontend frontend.FrontendConfig `yaml:"frontend,omitempty"` - Compactor compactor.Config `yaml:"compactor,omitempty"` - Ingester ingester.Config `yaml:"ingester,omitempty"` - StorageConfig storage.Config `yaml:"storage,omitempty"` - LimitsConfig overrides.Limits `yaml:"overrides,omitempty"` - MemberlistKV memberlist.KVConfig `yaml:"memberlist,omitempty"` + Server server.Config `yaml:"server,omitempty"` + Distributor distributor.Config `yaml:"distributor,omitempty"` + IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"` + Querier querier.Config `yaml:"querier,omitempty"` + Frontend frontend.Config `yaml:"query_frontend,omitempty"` + Compactor compactor.Config `yaml:"compactor,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` + StorageConfig storage.Config `yaml:"storage,omitempty"` + LimitsConfig overrides.Limits `yaml:"overrides,omitempty"` + MemberlistKV memberlist.KVConfig `yaml:"memberlist,omitempty"` } // RegisterFlagsAndApplyDefaults registers flag. diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 40d01a7c9d3..5b6fe7418a5 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -32,17 +32,17 @@ import ( // The various modules that make up tempo. const ( - Ring string = "ring" - Overrides string = "overrides" - Server string = "server" - Distributor string = "distributor" - Ingester string = "ingester" - Querier string = "querier" - Frontend string = "frontend" - Compactor string = "compactor" - Store string = "store" - MemberlistKV string = "memberlist-kv" - All string = "all" + Ring string = "ring" + Overrides string = "overrides" + Server string = "server" + Distributor string = "distributor" + Ingester string = "ingester" + Querier string = "querier" + QueryFrontend string = "query-frontend" + Compactor string = "compactor" + Store string = "store" + MemberlistKV string = "memberlist-kv" + All string = "all" ) func (t *App) initServer() (services.Service, error) { @@ -240,7 +240,7 @@ func (t *App) setupModuleManager() error { mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(Ingester, t.initIngester) mm.RegisterModule(Querier, t.initQuerier) - mm.RegisterModule(Frontend, t.initQueryFrontend) + mm.RegisterModule(QueryFrontend, t.initQueryFrontend) mm.RegisterModule(Compactor, t.initCompactor) mm.RegisterModule(Store, t.initStore, modules.UserInvisibleModule) mm.RegisterModule(All, nil) @@ -250,13 +250,13 @@ func (t *App) setupModuleManager() error { // Overrides: nil, // Store: nil, // MemberlistKV: nil, - Frontend: {Server}, - Ring: {Server, MemberlistKV}, - Distributor: {Ring, Server, Overrides}, - Ingester: {Store, Server, Overrides, MemberlistKV}, - Querier: {Store, Ring}, - Compactor: {Store, Server, MemberlistKV}, - All: {Compactor, Frontend, Querier, Ingester, Distributor}, + QueryFrontend: {Server}, + Ring: {Server, MemberlistKV}, + Distributor: {Ring, Server, Overrides}, + Ingester: {Store, Server, Overrides, MemberlistKV}, + Querier: {Store, Ring}, + Compactor: {Store, Server, MemberlistKV}, + All: {Compactor, QueryFrontend, Querier, Ingester, Distributor}, } for mod, targets := range deps { diff --git a/integration/e2e/config-microservices.yaml b/integration/e2e/config-microservices.yaml index c1903bbed15..f2eb2aa35a3 100644 --- a/integration/e2e/config-microservices.yaml +++ b/integration/e2e/config-microservices.yaml @@ -42,4 +42,8 @@ memberlist: - tempo_e2e-distributor:7946 - tempo_e2e-ingester-1:7946 - tempo_e2e-ingester-2:7946 - - tempo_e2e-querier:7946 \ No newline at end of file + - tempo_e2e-querier:7946 + +querier: + frontend_worker: + frontend_address: tempo_e2e-query-frontend:9095 \ No newline at end of file diff --git a/integration/e2e/e2e_test.go b/integration/e2e/e2e_test.go index c59d2fef705..eae407da689 100644 --- a/integration/e2e/e2e_test.go +++ b/integration/e2e/e2e_test.go @@ -69,6 +69,7 @@ func TestAllInOne(t *testing.T) { // test metrics require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempodb_blocklist_length")) + require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_query_frontend_queries_total")) // query trace - should fetch from backend queryAndAssertTrace(t, "http://"+tempo.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1) @@ -87,8 +88,9 @@ func TestMicroservices(t *testing.T) { tempoIngester1 := util.NewTempoIngester(1) tempoIngester2 := util.NewTempoIngester(2) tempoDistributor := util.NewTempoDistributor() + tempoQueryFrontend := util.NewTempoQueryFrontend() tempoQuerier := util.NewTempoQuerier() - require.NoError(t, s.StartAndWaitReady(tempoIngester1, tempoIngester2, tempoDistributor, tempoQuerier)) + require.NoError(t, s.StartAndWaitReady(tempoIngester1, tempoIngester2, tempoDistributor, tempoQueryFrontend, tempoQuerier)) // wait for 2 active ingesters time.Sleep(1 * time.Second) @@ -124,7 +126,7 @@ func TestMicroservices(t *testing.T) { require.NoError(t, tempoIngester2.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_traces_created_total")) // query an in-memory trace - queryAndAssertTrace(t, "http://"+tempoQuerier.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1) + queryAndAssertTrace(t, "http://"+tempoQueryFrontend.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1) // flush trace to backend res, err := cortex_e2e.GetRequest("http://" + tempoIngester1.Endpoint(3100) + "/flush") @@ -143,9 +145,10 @@ func TestMicroservices(t *testing.T) { require.NoError(t, tempoIngester1.WaitSumMetrics(cortex_e2e.Equals(2), "tempodb_blocklist_length")) require.NoError(t, tempoIngester2.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) require.NoError(t, tempoIngester2.WaitSumMetrics(cortex_e2e.Equals(2), "tempodb_blocklist_length")) + require.NoError(t, tempoQueryFrontend.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_query_frontend_queries_total")) // query trace - should fetch from backend - queryAndAssertTrace(t, "http://"+tempoQuerier.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1) + queryAndAssertTrace(t, "http://"+tempoQueryFrontend.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1) // stop an ingester and confirm we can still write and query err = tempoIngester2.Stop() @@ -156,7 +159,7 @@ func TestMicroservices(t *testing.T) { hexID = fmt.Sprintf("%016x%016x", batch.Spans[0].TraceIdHigh, batch.Spans[0].TraceIdLow) // query an in-memory trace - queryAndAssertTrace(t, "http://"+tempoQuerier.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1) + queryAndAssertTrace(t, "http://"+tempoQueryFrontend.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1) // stop another ingester and confirm things fail err = tempoIngester1.Stop() diff --git a/integration/util.go b/integration/util.go index 741a6ee0bac..01a1bf5302b 100644 --- a/integration/util.go +++ b/integration/util.go @@ -70,6 +70,22 @@ func NewTempoIngester(replica int) *cortex_e2e.HTTPService { return s } +func NewTempoQueryFrontend() *cortex_e2e.HTTPService { + args := []string{"-config.file=" + filepath.Join(cortex_e2e.ContainerSharedDir, "config.yaml"), "-target=query-frontend"} + + s := cortex_e2e.NewHTTPService( + "query-frontend", + image, + cortex_e2e.NewCommandWithoutEntrypoint("/tempo", args...), + cortex_e2e.NewHTTPReadinessProbe(3100, "/ready", 200, 299), + 3100, + ) + + s.SetBackoff(tempoBackoff()) + + return s +} + func NewTempoQuerier() *cortex_e2e.HTTPService { args := []string{"-config.file=" + filepath.Join(cortex_e2e.ContainerSharedDir, "config.yaml"), "-target=querier"} diff --git a/modules/frontend/config.go b/modules/frontend/config.go index d62cb0cd7ab..16b78523be8 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -6,13 +6,13 @@ import ( "github.com/cortexproject/cortex/pkg/querier/frontend" ) -type FrontendConfig struct { +type Config struct { frontend.Config `yaml:",inline"` QueryShards int `yaml:"query_shards,omitempty"` } -func (cfg *FrontendConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { - cfg.Config.CompressResponses = false +func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { + cfg.Config.CompressResponses = true cfg.Config.DownstreamURL = "" cfg.Config.LogQueriesLongerThan = 0 cfg.Config.MaxOutstandingPerTenant = 100 diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index d25ed06179f..cfaae39e58a 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -1,31 +1,20 @@ package frontend import ( - "context" - "encoding/binary" - "encoding/hex" - "fmt" - "io/ioutil" "net/http" - "github.com/opentracing/opentracing-go" - "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/user" - "github.com/grafana/tempo/modules/querier" -) - -const ( - querierPrefix = "/querier" - queryDelimiter = "?" + "github.com/cortexproject/cortex/pkg/querier/frontend" ) // NewTripperware returns a Tripperware configured with a middleware to split requests -func NewTripperware(cfg FrontendConfig, logger log.Logger, registerer prometheus.Registerer) (frontend.Tripperware, error) { +func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registerer) (frontend.Tripperware, error) { level.Info(logger).Log("msg", "creating tripperware in query frontend to shard queries") queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", @@ -35,7 +24,7 @@ func NewTripperware(cfg FrontendConfig, logger log.Logger, registerer prometheus return func(next http.RoundTripper) http.RoundTripper { // Get the http request, add custom parameters to it, split it, and call downstream roundtripper - rt := NewRoundTripper(next, ShardingWare(cfg.QueryShards, logger, registerer)) + rt := NewRoundTripper(next, ShardingWare(cfg.QueryShards, logger)) return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { orgID, _ := user.ExtractOrgID(r.Context()) queriesPerTenant.WithLabelValues(orgID).Inc() @@ -83,7 +72,7 @@ type roundTripper struct { // NewRoundTripper merges a set of middlewares into an handler, then inject it into the `next` roundtripper func NewRoundTripper(next http.RoundTripper, middlewares ...Middleware) http.RoundTripper { transport := roundTripper{ - next: next, + next: next, } transport.handler = MergeMiddlewares(middlewares...).Wrap(&transport) return transport @@ -97,152 +86,3 @@ func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) { func (q roundTripper) Do(r *http.Request) (*http.Response, error) { return q.next.RoundTrip(r) } - -func ShardingWare(queryShards int, logger log.Logger, registerer prometheus.Registerer) Middleware { - return MiddlewareFunc(func(next Handler) Handler { - return shardQuery{ - next: next, - queryShards: queryShards, - logger: logger, - } - }) -} - -type shardQuery struct { - next Handler - queryShards int - logger log.Logger - blockBoundaries [][]byte -} - -// Do implements Handler -func (s shardQuery) Do(r *http.Request) (*http.Response, error) { - userID, err := user.ExtractOrgID(r.Context()) - if err != nil { - return nil, err - } - - // only need to initialise boundaries once - if len(s.blockBoundaries) == 0 { - s.blockBoundaries = createBlockShards(s.queryShards) - } - - reqs := make([]*http.Request, s.queryShards) - for i := 0; i < s.queryShards; i++ { - reqs[i] = r.Clone(r.Context()) - q := reqs[i].URL.Query() - q.Add(querier.BlockStartKey, hex.EncodeToString(s.blockBoundaries[i])) - q.Add(querier.BlockEndKey, hex.EncodeToString(s.blockBoundaries[i+1])) - - if i==0 { - q.Add(querier.QueryIngestersKey, "true") - } else { - q.Add(querier.QueryIngestersKey, "false") - } - - reqs[i].Header.Set(user.OrgIDHeaderName, userID) - // adding to RequestURI ONLY because weaveworks/common uses the RequestURI field to translate from - reqs[i].RequestURI = querierPrefix + reqs[i].URL.RequestURI() + queryDelimiter + q.Encode() - } - - rrs, err := DoRequests(r.Context(), s.next, reqs) - if err != nil { - return nil, err - } - - // todo: add merging logic here if there are more than one results - var errCode int - var errBody []byte - for _, rr := range rrs { - if rr.Response.StatusCode == http.StatusOK { - return rr.Response, nil - } - defer rr.Response.Body.Close() - if rr.Response.StatusCode > errCode { - errCode = rr.Response.StatusCode - errBody, _ = ioutil.ReadAll(rr.Response.Body) - } - } - - return nil, fmt.Errorf("%s", errBody) -} - -// createBlockShards splits the blockrange into queryshard parts -func createBlockShards(queryShards int) [][]byte { - if queryShards == 0 { - return nil - } - - // create sharded queries - blockBoundaries := make([][]byte, queryShards+1) - for i := 0; i < queryShards+1; i ++ { - blockBoundaries[i] = make([]byte, 16) - } - const MaxUint = uint64(^uint8(0)) - for i := 0; i < queryShards; i++ { - binary.LittleEndian.PutUint64(blockBoundaries[i][:8], (MaxUint/uint64(queryShards))*uint64(i)) - binary.LittleEndian.PutUint64(blockBoundaries[i][8:], 0) - } - const MaxUint64 = ^uint64(0) - binary.LittleEndian.PutUint64(blockBoundaries[queryShards][:8], MaxUint64) - binary.LittleEndian.PutUint64(blockBoundaries[queryShards][8:], MaxUint64) - - return blockBoundaries -} - -// RequestResponse contains a request response and the respective request that was used. -type RequestResponse struct { - Request *http.Request - Response *http.Response -} - -// DoRequests executes a list of requests in parallel. -func DoRequests(ctx context.Context, downstream Handler, reqs []*http.Request) ([]RequestResponse, error) { - // If one of the requests fail, we want to be able to cancel the rest of them. - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Feed all requests to a bounded intermediate channel to limit parallelism. - intermediate := make(chan *http.Request) - go func() { - for _, req := range reqs { - intermediate <- req - } - close(intermediate) - }() - - respChan, errChan := make(chan RequestResponse), make(chan error) - // todo: make this configurable using limits - parallelism := 10 - if parallelism > len(reqs) { - parallelism = len(reqs) - } - for i := 0; i < parallelism; i++ { - go func() { - for req := range intermediate { - resp, err := downstream.Do(req) - if err != nil { - errChan <- err - } else { - respChan <- RequestResponse{req, resp} - } - } - }() - } - - resps := make([]RequestResponse, 0, len(reqs)) - var firstErr error - for range reqs { - select { - case resp := <-respChan: - resps = append(resps, resp) - case err := <-errChan: - if firstErr == nil { - cancel() - firstErr = err - } - } - } - - return resps, firstErr -} diff --git a/modules/frontend/frontend_test.go b/modules/frontend/frontend_test.go index 3c653947727..a7b30d148d4 100644 --- a/modules/frontend/frontend_test.go +++ b/modules/frontend/frontend_test.go @@ -1,8 +1,9 @@ package frontend import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestCreateBlockShards(t *testing.T) { diff --git a/modules/frontend/querysharding.go b/modules/frontend/querysharding.go new file mode 100644 index 00000000000..d297ee9464c --- /dev/null +++ b/modules/frontend/querysharding.go @@ -0,0 +1,164 @@ +package frontend + +import ( + "context" + "encoding/binary" + "encoding/hex" + "fmt" + "io/ioutil" + "net/http" + + "github.com/go-kit/kit/log" + "github.com/weaveworks/common/user" + + "github.com/grafana/tempo/modules/querier" +) + +const ( + querierPrefix = "/querier" + queryDelimiter = "?" +) + +func ShardingWare(queryShards int, logger log.Logger) Middleware { + return MiddlewareFunc(func(next Handler) Handler { + return shardQuery{ + next: next, + queryShards: queryShards, + logger: logger, + } + }) +} + +type shardQuery struct { + next Handler + queryShards int + logger log.Logger + blockBoundaries [][]byte +} + +// Do implements Handler +func (s shardQuery) Do(r *http.Request) (*http.Response, error) { + userID, err := user.ExtractOrgID(r.Context()) + if err != nil { + return nil, err + } + + // only need to initialise boundaries once + if len(s.blockBoundaries) == 0 { + s.blockBoundaries = createBlockShards(s.queryShards) + } + + reqs := make([]*http.Request, s.queryShards) + for i := 0; i < s.queryShards; i++ { + reqs[i] = r.Clone(r.Context()) + q := reqs[i].URL.Query() + q.Add(querier.BlockStartKey, hex.EncodeToString(s.blockBoundaries[i])) + q.Add(querier.BlockEndKey, hex.EncodeToString(s.blockBoundaries[i+1])) + + if i == 0 { + q.Add(querier.QueryIngestersKey, "true") + } else { + q.Add(querier.QueryIngestersKey, "false") + } + + reqs[i].Header.Set(user.OrgIDHeaderName, userID) + // adding to RequestURI ONLY because weaveworks/common uses the RequestURI field to translate from + reqs[i].RequestURI = querierPrefix + reqs[i].URL.RequestURI() + queryDelimiter + q.Encode() + } + + rrs, err := DoRequests(r.Context(), s.next, reqs) + if err != nil { + return nil, err + } + + // todo: add merging logic here if there are more than one results + var errCode int + var errBody []byte + for _, rr := range rrs { + if rr.Response.StatusCode == http.StatusOK { + return rr.Response, nil + } + if rr.Response.StatusCode > errCode { + errCode = rr.Response.StatusCode + errBody, _ = ioutil.ReadAll(rr.Response.Body) + rr.Response.Body.Close() + } + } + + return nil, fmt.Errorf("%s", errBody) +} + +// createBlockShards splits the blockrange into queryshard parts +func createBlockShards(queryShards int) [][]byte { + if queryShards == 0 { + return nil + } + + // create sharded queries + blockBoundaries := make([][]byte, queryShards+1) + for i := 0; i < queryShards+1; i++ { + blockBoundaries[i] = make([]byte, 16) + } + const MaxUint = uint64(^uint8(0)) + for i := 0; i < queryShards; i++ { + binary.LittleEndian.PutUint64(blockBoundaries[i][:8], (MaxUint/uint64(queryShards))*uint64(i)) + binary.LittleEndian.PutUint64(blockBoundaries[i][8:], 0) + } + const MaxUint64 = ^uint64(0) + binary.LittleEndian.PutUint64(blockBoundaries[queryShards][:8], MaxUint64) + binary.LittleEndian.PutUint64(blockBoundaries[queryShards][8:], MaxUint64) + + return blockBoundaries +} + +// RequestResponse contains a request response and the respective request that was used. +type RequestResponse struct { + Request *http.Request + Response *http.Response +} + +// DoRequests executes a list of requests in parallel. +func DoRequests(ctx context.Context, downstream Handler, reqs []*http.Request) ([]RequestResponse, error) { + // Feed all requests to a bounded intermediate channel to limit parallelism. + intermediate := make(chan *http.Request) + go func() { + for _, req := range reqs { + intermediate <- req + } + close(intermediate) + }() + + respChan, errChan := make(chan RequestResponse), make(chan error) + // todo: make this configurable using limits + parallelism := 10 + if parallelism > len(reqs) { + parallelism = len(reqs) + } + for i := 0; i < parallelism; i++ { + go func() { + for req := range intermediate { + resp, err := downstream.Do(req) + if err != nil { + errChan <- err + } else { + respChan <- RequestResponse{req, resp} + } + } + }() + } + + resps := make([]RequestResponse, 0, len(reqs)) + var firstErr error + for range reqs { + select { + case resp := <-respChan: + resps = append(resps, resp) + case err := <-errChan: + if firstErr == nil { + firstErr = err + } + } + } + + return resps, firstErr +} From 7da27af58bf6a3b58382e880adb9f4d856a0f640 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 17 Dec 2020 13:47:53 +0530 Subject: [PATCH 9/9] Address comments, add querier.frontend-address flag Signed-off-by: Annanay --- modules/frontend/querysharding.go | 13 ++++++++----- .../{frontend_test.go => querysharding_test.go} | 2 +- modules/querier/config.go | 5 ++++- 3 files changed, 13 insertions(+), 7 deletions(-) rename modules/frontend/{frontend_test.go => querysharding_test.go} (96%) diff --git a/modules/frontend/querysharding.go b/modules/frontend/querysharding.go index d297ee9464c..f6a6bf30921 100644 --- a/modules/frontend/querysharding.go +++ b/modules/frontend/querysharding.go @@ -45,11 +45,11 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { // only need to initialise boundaries once if len(s.blockBoundaries) == 0 { - s.blockBoundaries = createBlockShards(s.queryShards) + s.blockBoundaries = createBlockBoundaries(s.queryShards) } reqs := make([]*http.Request, s.queryShards) - for i := 0; i < s.queryShards; i++ { + for i := 0; i < len(s.blockBoundaries)-1; i++ { reqs[i] = r.Clone(r.Context()) q := reqs[i].URL.Query() q.Add(querier.BlockStartKey, hex.EncodeToString(s.blockBoundaries[i])) @@ -62,7 +62,10 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { } reqs[i].Header.Set(user.OrgIDHeaderName, userID) - // adding to RequestURI ONLY because weaveworks/common uses the RequestURI field to translate from + + // adding to RequestURI only because weaveworks/common uses the RequestURI field to + // translate from http.Request to httpgrpc.Request + // https://github.com/weaveworks/common/blob/47e357f4e1badb7da17ad74bae63e228bdd76e8f/httpgrpc/server/server.go#L48 reqs[i].RequestURI = querierPrefix + reqs[i].URL.RequestURI() + queryDelimiter + q.Encode() } @@ -88,8 +91,8 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) { return nil, fmt.Errorf("%s", errBody) } -// createBlockShards splits the blockrange into queryshard parts -func createBlockShards(queryShards int) [][]byte { +// createBlockBoundaries splits the range of blockIDs into queryShards parts +func createBlockBoundaries(queryShards int) [][]byte { if queryShards == 0 { return nil } diff --git a/modules/frontend/frontend_test.go b/modules/frontend/querysharding_test.go similarity index 96% rename from modules/frontend/frontend_test.go rename to modules/frontend/querysharding_test.go index a7b30d148d4..5be23263a88 100644 --- a/modules/frontend/frontend_test.go +++ b/modules/frontend/querysharding_test.go @@ -34,7 +34,7 @@ func TestCreateBlockShards(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - bb := createBlockShards(tt.queryShards) + bb := createBlockBoundaries(tt.queryShards) assert.Len(t, bb, len(tt.expected)) for i := 0; i < len(bb); i++ { diff --git a/modules/querier/config.go b/modules/querier/config.go index 5204e7b49bf..b7a005aa6fb 100644 --- a/modules/querier/config.go +++ b/modules/querier/config.go @@ -22,7 +22,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.ExtraQueryDelay = 0 cfg.MaxConcurrentQueries = 5 cfg.Worker = cortex_frontend.WorkerConfig{ - Parallelism: 2, + MatchMaxConcurrency: true, + Parallelism: 2, GRPCClientConfig: grpcclient.ConfigWithTLS{ GRPC: grpcclient.Config{ MaxRecvMsgSize: 100 << 20, @@ -32,4 +33,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) }, }, } + + f.StringVar(&cfg.Worker.Address, prefix+".frontend-address", "", "Address of query frontend service, in host:port format.") }