Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Query Frontend Module #400

Merged
merged 10 commits into from
Jan 4, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [CHANGE] Remove S3 automatic bucket creation [#404](https://github.com/grafana/tempo/pull/404)
* [ENHANCEMENT] Add docker-compose example for GCS along with new backend options [#397](https://github.com/grafana/tempo/pull/397)
* [ENHANCEMENT] tempo-cli list blocks usability improvements [#403](https://github.com/grafana/tempo/pull/403)
* [ENHANCEMENT] Add Query Frontend module to allow scaling the query path [#400](https://github.com/grafana/tempo/pull/400)
* [BUGFIX] Compactor without GCS permissions fail silently [#379](https://github.com/grafana/tempo/issues/379)

## v0.4.0
Expand Down
5 changes: 5 additions & 0 deletions cmd/tempo/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"

cortex_frontend "github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -24,6 +25,7 @@ import (

"github.com/grafana/tempo/modules/compactor"
"github.com/grafana/tempo/modules/distributor"
"github.com/grafana/tempo/modules/frontend"
"github.com/grafana/tempo/modules/ingester"
ingester_client "github.com/grafana/tempo/modules/ingester/client"
"github.com/grafana/tempo/modules/overrides"
Expand All @@ -44,6 +46,7 @@ type Config struct {
Distributor distributor.Config `yaml:"distributor,omitempty"`
IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
Frontend frontend.Config `yaml:"query_frontend,omitempty"`
Compactor compactor.Config `yaml:"compactor,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage,omitempty"`
Expand Down Expand Up @@ -78,6 +81,7 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
c.Distributor.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "distributor"), f)
c.Ingester.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "ingester"), f)
c.Querier.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "querier"), f)
c.Frontend.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "frontend"), f)
c.Compactor.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "compactor"), f)
c.StorageConfig.RegisterFlagsAndApplyDefaults(tempo_util.PrefixConfig(prefix, "storage"), f)

Expand Down Expand Up @@ -110,6 +114,7 @@ type App struct {
overrides *overrides.Overrides
distributor *distributor.Distributor
querier *querier.Querier
frontend *cortex_frontend.Frontend
compactor *compactor.Compactor
ingester *ingester.Ingester
store storage.Store
Expand Down
79 changes: 62 additions & 17 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@ import (
"os"

"github.com/cortexproject/cortex/pkg/cortex"
cortex_frontend "github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/modules"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/kit/log/level"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"

"github.com/grafana/tempo/modules/compactor"
"github.com/grafana/tempo/modules/distributor"
"github.com/grafana/tempo/modules/frontend"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/querier"
Expand All @@ -29,16 +32,17 @@ import (

// The various modules that make up tempo.
const (
Ring string = "ring"
Overrides string = "overrides"
Server string = "server"
Distributor string = "distributor"
Ingester string = "ingester"
Querier string = "querier"
Compactor string = "compactor"
Store string = "store"
MemberlistKV string = "memberlist-kv"
All string = "all"
Ring string = "ring"
Overrides string = "overrides"
Server string = "server"
Distributor string = "distributor"
Ingester string = "ingester"
Querier string = "querier"
QueryFrontend string = "query-frontend"
Compactor string = "compactor"
Store string = "store"
MemberlistKV string = "memberlist-kv"
All string = "all"
)

func (t *App) initServer() (services.Service, error) {
Expand Down Expand Up @@ -123,6 +127,18 @@ func (t *App) initIngester() (services.Service, error) {
}

func (t *App) initQuerier() (services.Service, error) {
// validate worker config
// if we're not in single binary mode and worker address is not specified - bail
if t.cfg.Target != All && t.cfg.Querier.Worker.Address == "" {
return nil, fmt.Errorf("frontend worker address not specified")
} else if t.cfg.Target == All {
// if we're in single binary mode with no worker address specified, register default endpoint
if t.cfg.Querier.Worker.Address == "" {
t.cfg.Querier.Worker.Address = fmt.Sprintf("127.0.0.1:%d", t.cfg.Server.GRPCListenPort)
level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", t.cfg.Querier.Worker.Address)
}
}

// todo: make ingester client a module instead of passing config everywhere
querier, err := querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store, t.overrides)
if err != nil {
Expand All @@ -134,9 +150,36 @@ func (t *App) initQuerier() (services.Service, error) {
t.httpAuthMiddleware,
).Wrap(http.HandlerFunc(t.querier.TraceByIDHandler))

t.server.HTTP.Handle("/querier/api/traces/{traceID}", tracesHandler)
return t.querier, t.querier.CreateAndRegisterWorker(t.server.HTTP)
}

func (t *App) initQueryFrontend() (services.Service, error) {
var err error
t.frontend, err = cortex_frontend.New(t.cfg.Frontend.Config, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

// custom tripperware that splits requests
tripperware, err := frontend.NewTripperware(t.cfg.Frontend, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
// tripperware will be called before f.roundTripper (which calls roundtripgrpc)
t.frontend.Wrap(tripperware)

tracesHandler := middleware.Merge(
t.httpAuthMiddleware,
).Wrap(t.frontend.Handler())

cortex_frontend.RegisterFrontendServer(t.server.GRPC, t.frontend)
t.server.HTTP.Handle("/api/traces/{traceID}", tracesHandler)

return t.querier, nil
return services.NewIdleService(nil, func(_ error) error {
t.frontend.Close()
return nil
}), nil
}

func (t *App) initCompactor() (services.Service, error) {
Expand Down Expand Up @@ -197,6 +240,7 @@ func (t *App) setupModuleManager() error {
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(Store, t.initStore, modules.UserInvisibleModule)
mm.RegisterModule(All, nil)
Expand All @@ -206,12 +250,13 @@ func (t *App) setupModuleManager() error {
// Overrides: nil,
// Store: nil,
// MemberlistKV: nil,
Ring: {Server, MemberlistKV},
Distributor: {Ring, Server, Overrides},
Ingester: {Store, Server, Overrides, MemberlistKV},
Querier: {Store, Ring},
Compactor: {Store, Server, MemberlistKV},
All: {Compactor, Querier, Ingester, Distributor},
QueryFrontend: {Server},
Ring: {Server, MemberlistKV},
Distributor: {Ring, Server, Overrides},
Ingester: {Store, Server, Overrides, MemberlistKV},
Querier: {Store, Ring},
Compactor: {Store, Server, MemberlistKV},
All: {Compactor, QueryFrontend, Querier, Ingester, Distributor},
}

for mod, targets := range deps {
Expand Down
6 changes: 5 additions & 1 deletion integration/e2e/config-microservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ memberlist:
- tempo_e2e-distributor:7946
- tempo_e2e-ingester-1:7946
- tempo_e2e-ingester-2:7946
- tempo_e2e-querier:7946
- tempo_e2e-querier:7946

querier:
frontend_worker:
frontend_address: tempo_e2e-query-frontend:9095
11 changes: 7 additions & 4 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestAllInOne(t *testing.T) {
// test metrics
require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempodb_blocklist_length"))
require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_query_frontend_queries_total"))

// query trace - should fetch from backend
queryAndAssertTrace(t, "http://"+tempo.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1)
Expand All @@ -87,8 +88,9 @@ func TestMicroservices(t *testing.T) {
tempoIngester1 := util.NewTempoIngester(1)
tempoIngester2 := util.NewTempoIngester(2)
tempoDistributor := util.NewTempoDistributor()
tempoQueryFrontend := util.NewTempoQueryFrontend()
tempoQuerier := util.NewTempoQuerier()
require.NoError(t, s.StartAndWaitReady(tempoIngester1, tempoIngester2, tempoDistributor, tempoQuerier))
require.NoError(t, s.StartAndWaitReady(tempoIngester1, tempoIngester2, tempoDistributor, tempoQueryFrontend, tempoQuerier))

// wait for 2 active ingesters
time.Sleep(1 * time.Second)
Expand Down Expand Up @@ -124,7 +126,7 @@ func TestMicroservices(t *testing.T) {
require.NoError(t, tempoIngester2.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_traces_created_total"))

// query an in-memory trace
queryAndAssertTrace(t, "http://"+tempoQuerier.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1)
queryAndAssertTrace(t, "http://"+tempoQueryFrontend.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1)

// flush trace to backend
res, err := cortex_e2e.GetRequest("http://" + tempoIngester1.Endpoint(3100) + "/flush")
Expand All @@ -143,9 +145,10 @@ func TestMicroservices(t *testing.T) {
require.NoError(t, tempoIngester1.WaitSumMetrics(cortex_e2e.Equals(2), "tempodb_blocklist_length"))
require.NoError(t, tempoIngester2.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
require.NoError(t, tempoIngester2.WaitSumMetrics(cortex_e2e.Equals(2), "tempodb_blocklist_length"))
require.NoError(t, tempoQueryFrontend.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_query_frontend_queries_total"))

// query trace - should fetch from backend
queryAndAssertTrace(t, "http://"+tempoQuerier.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1)
queryAndAssertTrace(t, "http://"+tempoQueryFrontend.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1)

// stop an ingester and confirm we can still write and query
err = tempoIngester2.Stop()
Expand All @@ -156,7 +159,7 @@ func TestMicroservices(t *testing.T) {
hexID = fmt.Sprintf("%016x%016x", batch.Spans[0].TraceIdHigh, batch.Spans[0].TraceIdLow)

// query an in-memory trace
queryAndAssertTrace(t, "http://"+tempoQuerier.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1)
queryAndAssertTrace(t, "http://"+tempoQueryFrontend.Endpoint(3100)+"/api/traces/"+hexID, "my operation", 1)

// stop another ingester and confirm things fail
err = tempoIngester1.Stop()
Expand Down
16 changes: 16 additions & 0 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@ func NewTempoIngester(replica int) *cortex_e2e.HTTPService {
return s
}

func NewTempoQueryFrontend() *cortex_e2e.HTTPService {
args := []string{"-config.file=" + filepath.Join(cortex_e2e.ContainerSharedDir, "config.yaml"), "-target=query-frontend"}

s := cortex_e2e.NewHTTPService(
"query-frontend",
image,
cortex_e2e.NewCommandWithoutEntrypoint("/tempo", args...),
cortex_e2e.NewHTTPReadinessProbe(3100, "/ready", 200, 299),
3100,
)

s.SetBackoff(tempoBackoff())

return s
}

func NewTempoQuerier() *cortex_e2e.HTTPService {
args := []string{"-config.file=" + filepath.Join(cortex_e2e.ContainerSharedDir, "config.yaml"), "-target=querier"}

Expand Down
20 changes: 20 additions & 0 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package frontend

import (
"flag"

"github.com/cortexproject/cortex/pkg/querier/frontend"
)

type Config struct {
frontend.Config `yaml:",inline"`
QueryShards int `yaml:"query_shards,omitempty"`
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
cfg.Config.CompressResponses = true
cfg.Config.DownstreamURL = ""
cfg.Config.LogQueriesLongerThan = 0
cfg.Config.MaxOutstandingPerTenant = 100
cfg.QueryShards = 4
}
88 changes: 88 additions & 0 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package frontend

import (
"net/http"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/querier/frontend"
)

// NewTripperware returns a Tripperware configured with a middleware to split requests
func NewTripperware(cfg Config, logger log.Logger, registerer prometheus.Registerer) (frontend.Tripperware, error) {
level.Info(logger).Log("msg", "creating tripperware in query frontend to shard queries")
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "query_frontend_queries_total",
Help: "Total queries received per tenant.",
}, []string{"tenant"})

return func(next http.RoundTripper) http.RoundTripper {
// Get the http request, add custom parameters to it, split it, and call downstream roundtripper
rt := NewRoundTripper(next, ShardingWare(cfg.QueryShards, logger))
return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
orgID, _ := user.ExtractOrgID(r.Context())
queriesPerTenant.WithLabelValues(orgID).Inc()

// tracing instrumentation
span, ctx := opentracing.StartSpanFromContext(r.Context(), "frontend.RoundTrip")
defer span.Finish()

r = r.WithContext(ctx)
return rt.RoundTrip(r)
})
}, nil
}

type Handler interface {
Do(*http.Request) (*http.Response, error)
}

type Middleware interface {
Wrap(Handler) Handler
}

// MiddlewareFunc is like http.HandlerFunc, but for Middleware.
type MiddlewareFunc func(Handler) Handler

// Wrap implements Middleware.
func (q MiddlewareFunc) Wrap(h Handler) Handler {
return q(h)
}

func MergeMiddlewares(middleware ...Middleware) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
for i := len(middleware) - 1; i >= 0; i-- {
next = middleware[i].Wrap(next)
}
return next
})
}

type roundTripper struct {
next http.RoundTripper
handler Handler
}

// NewRoundTripper merges a set of middlewares into an handler, then inject it into the `next` roundtripper
func NewRoundTripper(next http.RoundTripper, middlewares ...Middleware) http.RoundTripper {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
transport := roundTripper{
next: next,
}
transport.handler = MergeMiddlewares(middlewares...).Wrap(&transport)
return transport
}

func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
return q.handler.Do(r)
}

// Do implements Handler.
func (q roundTripper) Do(r *http.Request) (*http.Response, error) {
return q.next.RoundTrip(r)
}
Loading