Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Query Frontend Module #400

Merged
merged 10 commits into from
Jan 4, 2021
25 changes: 16 additions & 9 deletions cmd/tempo/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"

cortex_frontend "github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/grafana/tempo/modules/compactor"
"github.com/grafana/tempo/modules/distributor"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/modules/frontend"
ingester_client "github.com/grafana/tempo/modules/ingester/client"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/querier"
Expand All @@ -40,15 +42,17 @@ type Config struct {
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
HTTPPrefix string `yaml:"http_prefix"`

Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
Compactor compactor.Config `yaml:"compactor,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage,omitempty"`
LimitsConfig overrides.Limits `yaml:"overrides,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist,omitempty"`
Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
Frontend frontend.FrontendConfig `yaml:"frontend,omitempty"`
Worker cortex_frontend.WorkerConfig `yaml:"frontend_worker"`
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
Compactor compactor.Config `yaml:"compactor,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage,omitempty"`
LimitsConfig overrides.Limits `yaml:"overrides,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist,omitempty"`
}

// RegisterFlagsAndApplyDefaults registers flag.
Expand Down Expand Up @@ -78,6 +82,8 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
c.Distributor.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "distributor"), f)
c.Ingester.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "ingester"), f)
c.Querier.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "querier"), f)
c.Frontend.ApplyDefaults()
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
// todo: add worker defaults
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
c.Compactor.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "compactor"), f)
c.StorageConfig.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "storage"), f)

Expand Down Expand Up @@ -110,6 +116,7 @@ type App struct {
overrides *overrides.Overrides
distributor *distributor.Distributor
querier *querier.Querier
frontend *cortex_frontend.Frontend
compactor *compactor.Compactor
ingester *ingester.Ingester
store storage.Store
Expand Down
51 changes: 48 additions & 3 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"os"

"github.com/cortexproject/cortex/pkg/cortex"
cortex_querier "github.com/cortexproject/cortex/pkg/querier"
cortex_frontend "github.com/cortexproject/cortex/pkg/querier/frontend"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
Expand All @@ -19,6 +22,7 @@ import (

"github.com/grafana/tempo/modules/compactor"
"github.com/grafana/tempo/modules/distributor"
"github.com/grafana/tempo/modules/frontend"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/querier"
Expand All @@ -35,6 +39,7 @@ const (
Distributor string = "distributor"
Ingester string = "ingester"
Querier string = "querier"
Frontend string = "frontend"
Compactor string = "compactor"
Store string = "store"
MemberlistKV string = "memberlist-kv"
Expand Down Expand Up @@ -133,10 +138,48 @@ func (t *App) initQuerier() (services.Service, error) {
tracesHandler := middleware.Merge(
t.httpAuthMiddleware,
).Wrap(http.HandlerFunc(t.querier.TraceByIDHandler))

t.server.HTTP.Handle("/api/traces/{traceID}", tracesHandler)

return t.querier, nil
worker, err := cortex_frontend.NewWorker(
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
t.cfg.Worker,
cortex_querier.Config{
MaxConcurrent: t.cfg.Querier.MaxConcurrent,
},
httpgrpc_server.NewServer(tracesHandler),
util.Logger,
)
if err != nil {
return nil, fmt.Errorf("failed to create frontend worker %w", err)
}

err = querier.RegisterSubservices(worker)
if err != nil {
return nil, fmt.Errorf("failed to register frontend worker %w", err)
}

return querier, nil
}

func (t *App) initQueryFrontend() (services.Service, error) {
var err error
t.frontend, err = cortex_frontend.New(t.cfg.Frontend.Config, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

// custom tripperware that splits requests
tripperware, err := frontend.NewTripperware(t.cfg.Frontend, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
// tripperware will be called before f.roundTripper (which calls roundtripgrpc)
t.frontend.Wrap(tripperware)

cortex_frontend.RegisterFrontendServer(t.server.GRPC, t.frontend)
// register at a different endpoint for now
t.server.HTTP.Handle("/api/traces/frontend/{traceID}", t.frontend.Handler())
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

return nil, nil
}

func (t *App) initCompactor() (services.Service, error) {
Expand Down Expand Up @@ -197,6 +240,7 @@ func (t *App) setupModuleManager() error {
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(Frontend, t.initQueryFrontend)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(Store, t.initStore, modules.UserInvisibleModule)
mm.RegisterModule(All, nil)
Expand All @@ -206,12 +250,13 @@ func (t *App) setupModuleManager() error {
// Overrides: nil,
// Store: nil,
// MemberlistKV: nil,
Frontend: {Server},
Ring: {Server, MemberlistKV},
Distributor: {Ring, Server, Overrides},
Ingester: {Store, Server, Overrides, MemberlistKV},
Querier: {Store, Ring},
Compactor: {Store, Server, MemberlistKV},
All: {Compactor, Querier, Ingester, Distributor},
All: {Compactor, Frontend, Querier, Ingester, Distributor},
}

for mod, targets := range deps {
Expand Down
227 changes: 227 additions & 0 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package frontend

import (
"context"
"encoding/binary"
"fmt"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"
"net/http"
)

type FrontendConfig struct {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
frontend.Config `yaml:",inline"`
ShardNum int `yaml:"shard_num,omitempty"`
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
}

func (cfg *FrontendConfig) ApplyDefaults() {
cfg.Config.CompressResponses = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm curious why we would default this to false. it seems like you were just following suit with Cortex, but unless I'm misunderstanding the functionality it feels like this should be true.

cfg.Config.DownstreamURL = ""
cfg.Config.LogQueriesLongerThan = 0
cfg.Config.MaxOutstandingPerTenant = 100
cfg.ShardNum = 4
}

// NewTripperware returns a Tripperware configured with a middleware to split requests
func NewTripperware(cfg FrontendConfig, logger log.Logger, registerer prometheus.Registerer) (frontend.Tripperware, error) {
level.Info(logger).Log("msg", "creating tripperware in query frontend to shard queries")
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "query_frontend_queries_total",
Help: "Total queries sent per tenant.",
}, []string{"user"})

return func(next http.RoundTripper) http.RoundTripper {
// get the http request, add some custom parameters to it, split it, and call downstream roundtripper
rt := NewRoundTripper(next, ShardingWare(cfg.ShardNum, logger, registerer))
return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
level.Info(util.Logger).Log("msg", "request received by custom tripperware")
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
orgID := r.Header.Get(user.OrgIDHeaderName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the standard weaveworks ExtractOrgID() method?

queriesPerTenant.WithLabelValues(orgID).Inc()

r = r.WithContext(user.InjectOrgID(r.Context(), orgID))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the org id already be in the context? There is standard auth middleware that should do this for us and inject single-tenant if auth is disabled.

return rt.RoundTrip(r)
})
}, nil
}

type Handler interface {
Do(*http.Request) (*http.Response, error)
}

type Middleware interface {
Wrap(Handler) Handler
}

// MiddlewareFunc is like http.HandlerFunc, but for Middleware.
type MiddlewareFunc func(Handler) Handler

// Wrap implements Middleware.
func (q MiddlewareFunc) Wrap(h Handler) Handler {
return q(h)
}

func MergeMiddlewares(middleware ...Middleware) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
for i := len(middleware) - 1; i >= 0; i-- {
next = middleware[i].Wrap(next)
}
return next
})
}

type roundTripper struct {
next http.RoundTripper
handler Handler
}

// NewRoundTripper merges a set of middlewares into an handler, then inject it into the `next` roundtripper
func NewRoundTripper(next http.RoundTripper, middlewares ...Middleware) http.RoundTripper {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
transport := roundTripper{
next: next,
}
transport.handler = MergeMiddlewares(middlewares...).Wrap(&transport)
return transport
}

func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
level.Info(util.Logger).Log("blockStart", r.URL.Query().Get("blockStart"), "blockEnd", r.URL.Query().Get("blockEnd"))
return q.handler.Do(r)
}

// Do implements Handler.
func (q roundTripper) Do(r *http.Request) (*http.Response, error) {
level.Info(util.Logger).Log("msg", "roundTripper.Do called")
return q.next.RoundTrip(r)
}

func ShardingWare(shardNum int, logger log.Logger, registerer prometheus.Registerer) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return shardQuery{
next: next,
shardNum: shardNum,
logger: logger,
splitByCounter: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "query_frontend_split_queries_total",
Help: "Total number of underlying query requests after sharding",
}, []string{"user"}),
}
})
}

type shardQuery struct {
next Handler
shardNum int
logger log.Logger
// Metrics.
splitByCounter *prometheus.CounterVec
}

// Do implements Handler
func (s shardQuery) Do(r *http.Request) (*http.Response, error) {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
level.Info(s.logger).Log("msg", "shardQuery called")

userID, err := user.ExtractOrgID(r.Context())
if err != nil {
return nil, err
}

// create sharded queries
boundaryBytes := make([][]byte, s.shardNum+1)
for i := 0; i < s.shardNum+1; i ++ {
boundaryBytes[i] = make([]byte, 0)
}
const MaxUint = ^uint64(0)
const MaxInt = int64(MaxUint >> 1)
for i := 0; i < s.shardNum; i++ {
binary.PutVarint(boundaryBytes[i], MaxInt*(int64(i))/int64(s.shardNum))
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
binary.PutVarint(boundaryBytes[i], 0)
}
binary.PutVarint(boundaryBytes[s.shardNum], MaxInt)
binary.PutVarint(boundaryBytes[s.shardNum], MaxInt)

reqs := make([]*http.Request, s.shardNum)
for i := 0; i < s.shardNum; i++ {
reqs[i] = r
reqs[i].URL.Query().Add("blockStart", string(boundaryBytes[i]))
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
reqs[i].URL.Query().Add("blockEnd", string(boundaryBytes[i+1]))
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
}
s.splitByCounter.WithLabelValues(userID).Add(float64(s.shardNum))
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

rrs, err := DoRequests(r.Context(), s.next, reqs)
if err != nil {
return nil, err
}

// todo: add merging logic here if there are more than one results
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Use the util.Combine... methods to do so.

for _, rr := range rrs {
if rr.Response.StatusCode == http.StatusOK {
return rr.Response, nil
}
}

return nil, fmt.Errorf("trace not found")
}

// RequestResponse contains a request response and the respective request that was used.
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
type RequestResponse struct {
Request *http.Request
Response *http.Response
}

// DoRequests executes a list of requests in parallel. The limits parameters is used to limit parallelism per single request.
func DoRequests(ctx context.Context, downstream Handler, reqs []*http.Request) ([]RequestResponse, error) {
// If one of the requests fail, we want to be able to cancel the rest of them.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Feed all requests to a bounded intermediate channel to limit parallelism.
intermediate := make(chan *http.Request)
go func() {
for _, req := range reqs {
intermediate <- req
}
close(intermediate)
}()

respChan, errChan := make(chan RequestResponse), make(chan error)
// todo: make this configurable using limits
parallelism := 10
if parallelism > len(reqs) {
parallelism = len(reqs)
}
for i := 0; i < parallelism; i++ {
go func() {
for req := range intermediate {
resp, err := downstream.Do(req)
if err != nil {
errChan <- err
} else {
respChan <- RequestResponse{req, resp}
}
}
}()
}

resps := make([]RequestResponse, 0, len(reqs))
var firstErr error
for range reqs {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
select {
case resp := <-respChan:
resps = append(resps, resp)
case err := <-errChan:
if firstErr == nil {
cancel()
firstErr = err
}
}
}

return resps, firstErr
}
Loading