From 7208caf2b9ad9eb57738fd6b00f413d86b0b4348 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 15 Sep 2021 11:40:08 -0600 Subject: [PATCH] Feature: allow querier and query frontend targets to run on same process (#4301) * allow querier and query frontend to both the registered in same process Signed-off-by: Trevor Whitney * rename QuerierWorkerService to WorkerService Signed-off-by: Trevor Whitney * add debug log for standalone evaluation Signed-off-by: Trevor Whitney * inline generation of querier.WorkerServiceConfig Co-authored-by: Ed Welch * Clarify auth middleware test, inline config gen * Fix typo Signed-off-by: Trevor Whitney * reconcile go mod with HEAD of main Signed-off-by: Trevor Whitney Co-authored-by: Ed Welch --- go.mod | 2 + pkg/loki/loki.go | 2 +- pkg/loki/modules.go | 66 ++++----- pkg/querier/worker_service.go | 146 ++++++++++++++++++ pkg/querier/worker_service_test.go | 228 +++++++++++++++++++++++++++++ vendor/modules.txt | 2 + 6 files changed, 407 insertions(+), 39 deletions(-) create mode 100644 pkg/querier/worker_service.go create mode 100644 pkg/querier/worker_service_test.go diff --git a/go.mod b/go.mod index 214763066da2..0cac24c01ed7 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c github.com/fsouza/fake-gcs-server v1.7.0 github.com/go-kit/kit v0.11.0 + github.com/go-kit/log v0.1.0 github.com/go-logfmt/logfmt v0.5.0 github.com/go-redis/redis/v8 v8.9.0 github.com/gocql/gocql v0.0.0-20200526081602-cd04bd7f22a7 @@ -66,6 +67,7 @@ require ( github.com/ncw/swift v1.0.52 github.com/oklog/ulid v1.3.1 github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e + github.com/opentracing-contrib/go-stdlib v1.0.0 github.com/opentracing/opentracing-go v1.2.0 // github.com/pierrec/lz4 v2.0.5+incompatible github.com/pierrec/lz4/v4 v4.1.7 diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index eadaf72ed1fd..1f59e11d2d28 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -411,7 +411,7 @@ func (t *Loki) setupModuleManager() error { Compactor: {Server, Overrides}, IndexGateway: {Server}, IngesterQuerier: {Ring}, - All: {Querier, Ingester, Distributor, TableManager, Ruler}, + All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, Ruler}, } // Add IngesterQuerier as a dependency for store when target is either ingester or querier. diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 7a3981eb8c83..7d60474fbaf8 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -16,7 +16,6 @@ import ( "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" - cortex_querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/cortexproject/cortex/pkg/ring" cortex_ruler "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/scheduler" @@ -29,7 +28,6 @@ import ( "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/discovery/dns" - httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" "github.com/weaveworks/common/user" @@ -190,51 +188,43 @@ func (t *Loki) initDistributor() (services.Service, error) { } func (t *Loki) initQuerier() (services.Service, error) { - var ( - worker services.Service - err error - ) - - // NewQuerierWorker now expects Frontend (or Scheduler) address to be set. - if t.Cfg.Worker.FrontendAddress != "" || t.Cfg.Worker.SchedulerAddress != "" { - t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent - level.Debug(util_log.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.Cfg.Worker)) - worker, err = cortex_querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(t.Server.HTTPServer.Handler), util_log.Logger, prometheus.DefaultRegisterer) - if err != nil { - return nil, err - } - } - if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 { t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod } + + var err error t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides) if err != nil { return nil, err } - httpMiddleware := middleware.Merge( - serverutil.RecoveryHTTPMiddleware, - t.HTTPAuthMiddleware, - serverutil.NewPrepopulateMiddleware(), - serverutil.ResponseJSONMiddleware(), + querierWorkerServiceConfig := querier.WorkerServiceConfig{ + AllEnabled: t.Cfg.isModuleEnabled(All), + GrpcListenPort: t.Cfg.Server.GRPCListenPort, + QuerierMaxConcurrent: t.Cfg.Querier.MaxConcurrent, + QuerierWorkerConfig: &t.Cfg.Worker, + QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend), + QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler), + } + + var queryHandlers = map[string]http.Handler{ + "/loki/api/v1/query_range": http.HandlerFunc(t.Querier.RangeQueryHandler), + "/loki/api/v1/query": http.HandlerFunc(t.Querier.InstantQueryHandler), + "/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler), + "/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler), + "/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), + "/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler), + "/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler), + + "/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler), + "/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler), + "/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), + "/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler), + "/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler), + } + return querier.InitWorkerService( + querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware, ) - t.Server.HTTP.Handle("/loki/api/v1/query_range", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.RangeQueryHandler))) - t.Server.HTTP.Handle("/loki/api/v1/query", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.InstantQueryHandler))) - // Prometheus compatibility requires `loki/api/v1/labels` however we already released `loki/api/v1/label` - // which is a little more consistent with `/loki/api/v1/label/{name}/values` so we are going to handle both paths. - t.Server.HTTP.Handle("/loki/api/v1/label", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler))) - t.Server.HTTP.Handle("/loki/api/v1/labels", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler))) - t.Server.HTTP.Handle("/loki/api/v1/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler))) - t.Server.HTTP.Handle("/loki/api/v1/tail", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.TailHandler))) - t.Server.HTTP.Handle("/loki/api/v1/series", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.SeriesHandler))) - - t.Server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LogQueryHandler))) - t.Server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler))) - t.Server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LabelHandler))) - t.Server.HTTP.Handle("/api/prom/tail", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.TailHandler))) - t.Server.HTTP.Handle("/api/prom/series", httpMiddleware.Wrap(http.HandlerFunc(t.Querier.SeriesHandler))) - return worker, nil // ok if worker is nil here } func (t *Loki) initIngester() (_ services.Service, err error) { diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go new file mode 100644 index 000000000000..6650ae24c747 --- /dev/null +++ b/pkg/querier/worker_service.go @@ -0,0 +1,146 @@ +package querier + +import ( + "fmt" + "net/http" + + querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/log/level" + "github.com/gorilla/mux" + "github.com/grafana/dskit/services" + "github.com/opentracing-contrib/go-stdlib/nethttp" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + "github.com/weaveworks/common/middleware" + + serverutil "github.com/grafana/loki/pkg/util/server" +) + +type WorkerServiceConfig struct { + AllEnabled bool + GrpcListenPort int + QuerierMaxConcurrent int + QuerierWorkerConfig *querier_worker.Config + QueryFrontendEnabled bool + QuerySchedulerEnabled bool +} + +// InitWorkerService takes a config object, a map of routes to handlers, an external http router and external +// http handler, and an auth middleware wrapper. This function creates an internal HTTP router that responds to all +// the provided query routes/handlers. This router can either be registered with the external Loki HTTP server, or +// be used internally by a querier worker so that it does not conflict with the routes registered by the Query Frontend module. +// +// 1. Query-Frontend Enabled: If Loki has an All or QueryFrontend target, the internal +// HTTP router is wrapped with Tenant ID parsing middleware and passed to the frontend +// worker. +// +// 2. Querier Standalone: The querier will register the internal HTTP router with the external +// HTTP router for the Prometheus API routes. Then the external HTTP server will be passed +// as a http.Handler to the frontend worker. +// +func InitWorkerService( + cfg WorkerServiceConfig, + queryRoutesToHandlers map[string]http.Handler, + externalRouter *mux.Router, + externalHandler http.Handler, + authMiddleware middleware.Interface, +) (serve services.Service, err error) { + + internalRouter := mux.NewRouter() + for route, handler := range queryRoutesToHandlers { + internalRouter.Handle(route, handler) + } + + // If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal + // HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the + // external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default + // middleware instrumentation. + if querierRunningStandalone(cfg) { + + // First, register the internal querier handler with the external HTTP server + routes := make([]string, len(queryRoutesToHandlers)) + var idx = 0 + for route := range queryRoutesToHandlers { + routes[idx] = route + idx++ + } + + registerRoutesExternally(routes, externalRouter, internalRouter, authMiddleware) + + //If no frontend or scheduler address has been configured, then there is no place for the + //querier worker to request work from, so no need to start a worker service + if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" { + return nil, nil + } + + // If a frontend or scheduler address has been configured, return a querier worker service that uses + // the external Loki Server HTTP server, which has now has the internal handler's routes registered with it + return querier_worker.NewQuerierWorker( + *(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(externalHandler), util_log.Logger, prometheus.DefaultRegisterer) + } + + // Since we must be running a querier with either a frontend and/or scheduler at this point, if no frontend or scheduler address + // is configured, Loki will default to using the frontend on localhost on it's own GRPC listening port. + if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" { + address := fmt.Sprintf("127.0.0.1:%d", cfg.GrpcListenPort) + level.Warn(util_log.Logger).Log( + "msg", "Worker address is empty, attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", + "address", address) + cfg.QuerierWorkerConfig.FrontendAddress = address + } + + // Add a middleware to extract the trace context and add a header. + var internalHandler http.Handler + internalHandler = nethttp.MiddlewareFunc( + opentracing.GlobalTracer(), + internalRouter.ServeHTTP, + nethttp.OperationNameFunc(func(r *http.Request) string { + return "internalQuerier" + })) + + // If queries are processed using the external HTTP Server, we need wrap the internal querier with + // HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the + // request context, as well as make sure any x-www-url-formencoded params are correctly parsed + httpMiddleware := middleware.Merge( + authMiddleware, + serverutil.NewPrepopulateMiddleware(), + ) + + internalHandler = httpMiddleware.Wrap(internalHandler) + + //Querier worker's max concurrent requests must be the same as the querier setting + (*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent + + //Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier + //and the query frontend + return querier_worker.NewQuerierWorker( + *(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(internalHandler), util_log.Logger, prometheus.DefaultRegisterer) +} + +func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) { + httpMiddleware := middleware.Merge( + serverutil.RecoveryHTTPMiddleware, + authMiddleware, + serverutil.NewPrepopulateMiddleware(), + serverutil.ResponseJSONMiddleware(), + ) + + for _, route := range routes { + externalRouter.Handle(route, httpMiddleware.Wrap(internalHandler)) + } +} + +func querierRunningStandalone(cfg WorkerServiceConfig) bool { + runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.AllEnabled + level.Debug(util_log.Logger).Log( + "msg", "determining if querier is running as standalone target", + "runningStandalone", runningStandalone, + "queryFrontendEnabled", cfg.QueryFrontendEnabled, + "queryScheduleEnabled", cfg.QuerySchedulerEnabled, + "allEnabled", cfg.AllEnabled, + ) + + return runningStandalone +} diff --git a/pkg/querier/worker_service_test.go b/pkg/querier/worker_service_test.go new file mode 100644 index 000000000000..f11411d2f374 --- /dev/null +++ b/pkg/querier/worker_service_test.go @@ -0,0 +1,228 @@ +package querier + +import ( + "net/http" + "net/http/httptest" + "testing" + + querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" + "github.com/gorilla/mux" + "github.com/grafana/dskit/services" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/middleware" +) + +func Test_InitQuerierService(t *testing.T) { + var mockQueryHandlers = map[string]http.Handler{ + "/loki/api/v1/query": http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte("test handler")) + require.NoError(t, err) + }), + } + + testContext := func(config WorkerServiceConfig, authMiddleware middleware.Interface) (*mux.Router, services.Service) { + externalRouter := mux.NewRouter() + + if authMiddleware == nil { + authMiddleware = middleware.Identity + } + + querierWorkerService, err := InitWorkerService( + config, + mockQueryHandlers, + externalRouter, + http.HandlerFunc(externalRouter.ServeHTTP), + authMiddleware, + ) + require.NoError(t, err) + + return externalRouter, querierWorkerService + } + + t.Run("when querier is configured to run standalone, without a query frontend", func(t *testing.T) { + t.Run("register the internal query handlers externally", func(t *testing.T) { + config := WorkerServiceConfig{ + QueryFrontendEnabled: false, + QuerySchedulerEnabled: false, + AllEnabled: false, + QuerierWorkerConfig: &querier_worker.Config{}, + } + + externalRouter, _ := testContext(config, nil) + + recorder := httptest.NewRecorder() + request := httptest.NewRequest("GET", "/loki/api/v1/query", nil) + externalRouter.ServeHTTP(recorder, request) + assert.Equal(t, 200, recorder.Code) + assert.Equal(t, "test handler", recorder.Body.String()) + }) + + t.Run("wrap external handler with auth middleware", func(t *testing.T) { + config := WorkerServiceConfig{ + QueryFrontendEnabled: false, + QuerySchedulerEnabled: false, + AllEnabled: false, + QuerierWorkerConfig: &querier_worker.Config{}, + } + + requestedAuthenticated := false + mockAuthMiddleware := middleware.Func(func(next http.Handler) http.Handler { + requestedAuthenticated = true + return next + }) + + externalRouter, _ := testContext(config, mockAuthMiddleware) + + recorder := httptest.NewRecorder() + request := httptest.NewRequest("GET", "/loki/api/v1/query", nil) + externalRouter.ServeHTTP(recorder, request) + assert.True(t, requestedAuthenticated) + }) + + t.Run("wrap external handler with response json middleware", func(t *testing.T) { + config := WorkerServiceConfig{ + QueryFrontendEnabled: false, + QuerySchedulerEnabled: false, + AllEnabled: false, + QuerierWorkerConfig: &querier_worker.Config{}, + } + + externalRouter, _ := testContext(config, nil) + + recorder := httptest.NewRecorder() + request := httptest.NewRequest("GET", "/loki/api/v1/query", nil) + externalRouter.ServeHTTP(recorder, request) + + contentTypeHeader := recorder.Header().Get("Content-Type") + assert.Equal(t, "application/json; charset=UTF-8", contentTypeHeader) + }) + + t.Run("do not create a querier worker service if neither frontend address nor scheduler address has been configured", func(t *testing.T) { + config := WorkerServiceConfig{ + QueryFrontendEnabled: false, + QuerySchedulerEnabled: false, + AllEnabled: false, + QuerierWorkerConfig: &querier_worker.Config{}, + } + + _, workerService := testContext(config, nil) + assert.Nil(t, workerService) + }) + + t.Run("return a querier worker service if frontend or scheduler address has been configured", func(t *testing.T) { + withFrontendConfig := WorkerServiceConfig{ + QuerierWorkerConfig: &querier_worker.Config{ + FrontendAddress: "http://example.com", + }, + } + withSchedulerConfig := WorkerServiceConfig{ + QuerierWorkerConfig: &querier_worker.Config{ + SchedulerAddress: "http://example.com", + }, + } + + for _, config := range []WorkerServiceConfig{ + withFrontendConfig, + withSchedulerConfig, + } { + _, workerService := testContext(config, nil) + assert.NotNil(t, workerService) + } + }) + }) + + t.Run("when query frontend, scheduler, or all target is enabled", func(t *testing.T) { + defaultWorkerConfig := querier_worker.Config{} + nonStandaloneTargetPermutations := []WorkerServiceConfig{ + { + QueryFrontendEnabled: true, + QuerySchedulerEnabled: false, + AllEnabled: false, + QuerierWorkerConfig: &defaultWorkerConfig, + }, + { + QueryFrontendEnabled: false, + QuerySchedulerEnabled: true, + AllEnabled: false, + QuerierWorkerConfig: &defaultWorkerConfig, + }, + { + QueryFrontendEnabled: false, + QuerySchedulerEnabled: false, + AllEnabled: true, + QuerierWorkerConfig: &defaultWorkerConfig, + }, + { + QueryFrontendEnabled: true, + QuerySchedulerEnabled: true, + AllEnabled: false, + QuerierWorkerConfig: &defaultWorkerConfig, + }, + { + QueryFrontendEnabled: true, + QuerySchedulerEnabled: false, + AllEnabled: true, + QuerierWorkerConfig: &defaultWorkerConfig, + }, + { + QueryFrontendEnabled: false, + QuerySchedulerEnabled: true, + AllEnabled: true, + QuerierWorkerConfig: &defaultWorkerConfig, + }, + { + QueryFrontendEnabled: true, + QuerySchedulerEnabled: true, + AllEnabled: true, + QuerierWorkerConfig: &defaultWorkerConfig, + }, + } + + t.Run("do not register the internal query handler externally", func(t *testing.T) { + for _, config := range nonStandaloneTargetPermutations { + externalRouter, _ := testContext(config, nil) + recorder := httptest.NewRecorder() + request := httptest.NewRequest("GET", "/loki/api/v1/query", nil) + externalRouter.ServeHTTP(recorder, request) + assert.Equal(t, 404, recorder.Code) + } + }) + + t.Run("use localhost as the worker address if none is set", func(t *testing.T) { + for _, config := range nonStandaloneTargetPermutations { + workerConfig := querier_worker.Config{} + config.QuerierWorkerConfig = &workerConfig + config.GrpcListenPort = 1234 + + testContext(config, nil) + + assert.Equal(t, "127.0.0.1:1234", workerConfig.FrontendAddress) + } + }) + + t.Run("set the worker's max concurrent request to the same as the max concurrent setting for the querier", func(t *testing.T) { + for _, config := range nonStandaloneTargetPermutations { + workerConfig := querier_worker.Config{} + config.QuerierWorkerConfig = &workerConfig + config.QuerierMaxConcurrent = 42 + + testContext(config, nil) + + assert.Equal(t, 42, workerConfig.MaxConcurrentRequests) + } + }) + + t.Run("always return a query worker service", func(t *testing.T) { + for _, config := range nonStandaloneTargetPermutations { + workerConfig := querier_worker.Config{} + config.QuerierWorkerConfig = &workerConfig + config.GrpcListenPort = 1234 + + _, querierWorkerService := testContext(config, nil) + + assert.NotNil(t, querierWorkerService) + } + }) + }) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 3b1d1da0c426..996b4292dc57 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -417,6 +417,7 @@ github.com/fsouza/fake-gcs-server/internal/backend github.com/go-kit/kit/log github.com/go-kit/kit/log/level # github.com/go-kit/log v0.1.0 +## explicit github.com/go-kit/log github.com/go-kit/log/level # github.com/go-logfmt/logfmt v0.5.0 @@ -765,6 +766,7 @@ github.com/opencontainers/image-spec/specs-go/v1 ## explicit github.com/opentracing-contrib/go-grpc # github.com/opentracing-contrib/go-stdlib v1.0.0 +## explicit github.com/opentracing-contrib/go-stdlib/nethttp # github.com/opentracing/opentracing-go v1.2.0 ## explicit