Skip to content

Commit

Permalink
Feature: allow querier and query frontend targets to run on same proc…
Browse files Browse the repository at this point in the history
…ess (#4301)

* allow querier and query frontend to both the registered in same process

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* rename QuerierWorkerService to WorkerService

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* add debug log for standalone evaluation

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* inline generation of querier.WorkerServiceConfig

Co-authored-by: Ed Welch <edward.welch@grafana.com>

* Clarify auth middleware test, inline config gen

* Fix typo

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* reconcile go mod with HEAD of main

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

Co-authored-by: Ed Welch <edward.welch@grafana.com>
  • Loading branch information
trevorwhitney and slim-bean authored Sep 15, 2021
1 parent f48e867 commit 7208caf
Show file tree
Hide file tree
Showing 6 changed files with 407 additions and 39 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c
github.com/fsouza/fake-gcs-server v1.7.0
github.com/go-kit/kit v0.11.0
github.com/go-kit/log v0.1.0
github.com/go-logfmt/logfmt v0.5.0
github.com/go-redis/redis/v8 v8.9.0
github.com/gocql/gocql v0.0.0-20200526081602-cd04bd7f22a7
Expand Down Expand Up @@ -66,6 +67,7 @@ require (
github.com/ncw/swift v1.0.52
github.com/oklog/ulid v1.3.1
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
// github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pierrec/lz4/v4 v4.1.7
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (t *Loki) setupModuleManager() error {
Compactor: {Server, Overrides},
IndexGateway: {Server},
IngesterQuerier: {Ring},
All: {Querier, Ingester, Distributor, TableManager, Ruler},
All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, Ruler},
}

// Add IngesterQuerier as a dependency for store when target is either ingester or querier.
Expand Down
66 changes: 28 additions & 38 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
cortex_querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/ring"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/scheduler"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/discovery/dns"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -190,51 +188,43 @@ func (t *Loki) initDistributor() (services.Service, error) {
}

func (t *Loki) initQuerier() (services.Service, error) {
var (
worker services.Service
err error
)

// NewQuerierWorker now expects Frontend (or Scheduler) address to be set.
if t.Cfg.Worker.FrontendAddress != "" || t.Cfg.Worker.SchedulerAddress != "" {
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
level.Debug(util_log.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.Cfg.Worker))
worker, err = cortex_querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(t.Server.HTTPServer.Handler), util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
}

if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod
}

var err error
t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
if err != nil {
return nil, err
}

httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
t.HTTPAuthMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
querierWorkerServiceConfig := querier.WorkerServiceConfig{
AllEnabled: t.Cfg.isModuleEnabled(All),
GrpcListenPort: t.Cfg.Server.GRPCListenPort,
QuerierMaxConcurrent: t.Cfg.Querier.MaxConcurrent,
QuerierWorkerConfig: &t.Cfg.Worker,
QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
}

var queryHandlers = map[string]http.Handler{
"/loki/api/v1/query_range": http.HandlerFunc(t.Querier.RangeQueryHandler),
"/loki/api/v1/query": http.HandlerFunc(t.Querier.InstantQueryHandler),
"/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler),

"/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler),
"/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler),
}
return querier.InitWorkerService(
querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
)
t.Server.HTTP.Handle("/loki/api/v1/query_range", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.RangeQueryHandler)))
t.Server.HTTP.Handle("/loki/api/v1/query", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.InstantQueryHandler)))
// Prometheus compatibility requires `loki/api/v1/labels` however we already released `loki/api/v1/label`
// which is a little more consistent with `/loki/api/v1/label/{name}/values` so we are going to handle both paths.
t.Server.HTTP.Handle("/loki/api/v1/label", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler)))
t.Server.HTTP.Handle("/loki/api/v1/labels", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler)))
t.Server.HTTP.Handle("/loki/api/v1/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler)))
t.Server.HTTP.Handle("/loki/api/v1/tail", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.TailHandler)))
t.Server.HTTP.Handle("/loki/api/v1/series", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.SeriesHandler)))

t.Server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LogQueryHandler)))
t.Server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler)))
t.Server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler)))
t.Server.HTTP.Handle("/api/prom/tail", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.TailHandler)))
t.Server.HTTP.Handle("/api/prom/series", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.SeriesHandler)))
return worker, nil // ok if worker is nil here
}

func (t *Loki) initIngester() (_ services.Service, err error) {
Expand Down
146 changes: 146 additions & 0 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package querier

import (
"fmt"
"net/http"

querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/dskit/services"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"

serverutil "github.com/grafana/loki/pkg/util/server"
)

type WorkerServiceConfig struct {
AllEnabled bool
GrpcListenPort int
QuerierMaxConcurrent int
QuerierWorkerConfig *querier_worker.Config
QueryFrontendEnabled bool
QuerySchedulerEnabled bool
}

// InitWorkerService takes a config object, a map of routes to handlers, an external http router and external
// http handler, and an auth middleware wrapper. This function creates an internal HTTP router that responds to all
// the provided query routes/handlers. This router can either be registered with the external Loki HTTP server, or
// be used internally by a querier worker so that it does not conflict with the routes registered by the Query Frontend module.
//
// 1. Query-Frontend Enabled: If Loki has an All or QueryFrontend target, the internal
// HTTP router is wrapped with Tenant ID parsing middleware and passed to the frontend
// worker.
//
// 2. Querier Standalone: The querier will register the internal HTTP router with the external
// HTTP router for the Prometheus API routes. Then the external HTTP server will be passed
// as a http.Handler to the frontend worker.
//
func InitWorkerService(
cfg WorkerServiceConfig,
queryRoutesToHandlers map[string]http.Handler,
externalRouter *mux.Router,
externalHandler http.Handler,
authMiddleware middleware.Interface,
) (serve services.Service, err error) {

internalRouter := mux.NewRouter()
for route, handler := range queryRoutesToHandlers {
internalRouter.Handle(route, handler)
}

// If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal
// HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the
// external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default
// middleware instrumentation.
if querierRunningStandalone(cfg) {

// First, register the internal querier handler with the external HTTP server
routes := make([]string, len(queryRoutesToHandlers))
var idx = 0
for route := range queryRoutesToHandlers {
routes[idx] = route
idx++
}

registerRoutesExternally(routes, externalRouter, internalRouter, authMiddleware)

//If no frontend or scheduler address has been configured, then there is no place for the
//querier worker to request work from, so no need to start a worker service
if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
return nil, nil
}

// If a frontend or scheduler address has been configured, return a querier worker service that uses
// the external Loki Server HTTP server, which has now has the internal handler's routes registered with it
return querier_worker.NewQuerierWorker(
*(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(externalHandler), util_log.Logger, prometheus.DefaultRegisterer)
}

// Since we must be running a querier with either a frontend and/or scheduler at this point, if no frontend or scheduler address
// is configured, Loki will default to using the frontend on localhost on it's own GRPC listening port.
if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
address := fmt.Sprintf("127.0.0.1:%d", cfg.GrpcListenPort)
level.Warn(util_log.Logger).Log(
"msg", "Worker address is empty, attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.",
"address", address)
cfg.QuerierWorkerConfig.FrontendAddress = address
}

// Add a middleware to extract the trace context and add a header.
var internalHandler http.Handler
internalHandler = nethttp.MiddlewareFunc(
opentracing.GlobalTracer(),
internalRouter.ServeHTTP,
nethttp.OperationNameFunc(func(r *http.Request) string {
return "internalQuerier"
}))

// If queries are processed using the external HTTP Server, we need wrap the internal querier with
// HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the
// request context, as well as make sure any x-www-url-formencoded params are correctly parsed
httpMiddleware := middleware.Merge(
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
)

internalHandler = httpMiddleware.Wrap(internalHandler)

//Querier worker's max concurrent requests must be the same as the querier setting
(*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent

//Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier
//and the query frontend
return querier_worker.NewQuerierWorker(
*(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(internalHandler), util_log.Logger, prometheus.DefaultRegisterer)
}

func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) {
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
)

for _, route := range routes {
externalRouter.Handle(route, httpMiddleware.Wrap(internalHandler))
}
}

func querierRunningStandalone(cfg WorkerServiceConfig) bool {
runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.AllEnabled
level.Debug(util_log.Logger).Log(
"msg", "determining if querier is running as standalone target",
"runningStandalone", runningStandalone,
"queryFrontendEnabled", cfg.QueryFrontendEnabled,
"queryScheduleEnabled", cfg.QuerySchedulerEnabled,
"allEnabled", cfg.AllEnabled,
)

return runningStandalone
}
Loading

0 comments on commit 7208caf

Please sign in to comment.