diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 559a17f51d7e..0acc3d2f708a 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -209,6 +209,7 @@ type Loki struct { // set during initialization ModuleManager *modules.Manager serviceMap map[string]services.Service + deps map[string][]string Server *server.Server ring *ring.Ring @@ -481,7 +482,34 @@ func (t *Loki) setupModuleManager() error { } } + t.deps = deps t.ModuleManager = mm return nil } + +func (t *Loki) isModuleActive(m string) bool { + for _, target := range t.Cfg.Target { + if target == m { + return true + } + if t.recursiveIsModuleActive(target, m) { + return true + } + } + return false +} + +func (t *Loki) recursiveIsModuleActive(target, m string) bool { + if targetDeps, ok := t.deps[target]; ok { + for _, dep := range targetDeps { + if dep == m { + return true + } + if t.recursiveIsModuleActive(dep, m) { + return true + } + } + } + return false +} diff --git a/pkg/loki/loki_test.go b/pkg/loki/loki_test.go index 19d45d482774..e02fb184c484 100644 --- a/pkg/loki/loki_test.go +++ b/pkg/loki/loki_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/grafana/dskit/flagext" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -52,3 +54,35 @@ func TestFlagDefaults(t *testing.T) { require.Equal(t, c.Server.GRPCServerPingWithoutStreamAllowed, true) require.Contains(t, gotFlags[flagToCheck], "(default true)") } + +func TestLoki_isModuleEnabled(t1 *testing.T) { + tests := []struct { + name string + target flagext.StringSliceCSV + module string + want bool + }{ + {name: "Target All includes Querier", target: flagext.StringSliceCSV{"all"}, module: Querier, want: true}, + {name: "Target Querier does not include Distributor", target: flagext.StringSliceCSV{"querier"}, module: Distributor, want: false}, + {name: "Target Read includes Query Frontend", target: flagext.StringSliceCSV{"read"}, module: QueryFrontend, want: true}, + {name: "Target Querier does not include Query Frontend", target: flagext.StringSliceCSV{"querier"}, module: QueryFrontend, want: false}, + {name: "Target Query Frontend does not include Querier", target: flagext.StringSliceCSV{"query-frontend"}, module: Querier, want: false}, + {name: "Multi target includes querier", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Querier, want: true}, + {name: "Multi target does not include distributor", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Distributor, want: false}, + {name: "Test recursive dep, Ingester -> TenantConfigs -> RuntimeConfig", target: flagext.StringSliceCSV{"ingester"}, module: RuntimeConfig, want: true}, + } + for _, tt := range tests { + t1.Run(tt.name, func(t1 *testing.T) { + t := &Loki{ + Cfg: Config{ + Target: tt.target, + }, + } + err := t.setupModuleManager() + assert.NoError(t1, err) + if got := t.isModuleActive(tt.module); got != tt.want { + t1.Errorf("isModuleActive() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index e3b37cfd759c..0f062be9c074 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -238,17 +238,30 @@ func (t *Loki) initQuerier() (services.Service, error) { "/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler), "/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler), "/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), - "/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler), "/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler), "/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler), "/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler), "/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), - "/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler), "/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler), } + + // We always want to register tail routes externally, tail requests are different from normal queries, they + // are HTTP requests that get upgraded to websocket requests and need to be handled/kept open by the Queriers. + // The frontend has code to proxy these requests, however when running in the same processes + // (such as target=All or target=Read) we don't want the frontend to proxy and instead we want the Queriers + // to directly register these routes. + // In practice this means we always want the queriers to register the tail routes externally, when a querier + // is standalone ALL routes are registered externally, and when it's in the same process as a frontend, + // we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered + // on the external router. + var alwaysExternalHandlers = map[string]http.Handler{ + "/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler), + "/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler), + } + return querier.InitWorkerService( - querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware, + querierWorkerServiceConfig, queryHandlers, alwaysExternalHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware, ) } @@ -480,7 +493,8 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { ).Wrap(frontendHandler) var defaultHandler http.Handler - if t.Cfg.Frontend.TailProxyURL != "" { + // If this process also acts as a Querier we don't do any proxying of tail requests + if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleActive(Querier) { httpMiddleware := middleware.Merge( t.HTTPAuthMiddleware, queryrange.StatsHTTPMiddleware, @@ -512,9 +526,13 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { t.Server.HTTP.Path("/api/prom/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler) t.Server.HTTP.Path("/api/prom/series").Methods("GET", "POST").Handler(frontendHandler) - // defer tail endpoints to the default handler - t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler) - t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler) + // Only register tailing requests if this process does not act as a Querier + // If this process is also a Querier the Querier will register the tail endpoints. + if !t.isModuleActive(Querier) { + // defer tail endpoints to the default handler + t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler) + t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler) + } if t.frontend == nil { return services.NewIdleService(nil, func(_ error) error { diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index 9ce2ed1cc547..996c8e8b786b 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -46,16 +46,35 @@ type WorkerServiceConfig struct { func InitWorkerService( cfg WorkerServiceConfig, queryRoutesToHandlers map[string]http.Handler, + alwaysExternalRoutesToHandlers map[string]http.Handler, externalRouter *mux.Router, externalHandler http.Handler, authMiddleware middleware.Interface, ) (serve services.Service, err error) { + // Create a couple Middlewares used to handle panics, perform auth, and parse Form's in http request + internalMiddleware := middleware.Merge( + serverutil.RecoveryHTTPMiddleware, + authMiddleware, + serverutil.NewPrepopulateMiddleware(), + ) + // External middleware also needs to set JSON content type headers + externalMiddleware := middleware.Merge( + internalMiddleware, + serverutil.ResponseJSONMiddleware(), + ) + internalRouter := mux.NewRouter() for route, handler := range queryRoutesToHandlers { internalRouter.Path(route).Methods("GET", "POST").Handler(handler) } + // There are some routes which are always registered on the external router, add them now and + // wrap them with the externalMiddleware + for route, handler := range alwaysExternalRoutesToHandlers { + externalRouter.Path(route).Methods("GET", "POST").Handler(externalMiddleware.Wrap(handler)) + } + // If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal // HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the // external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default @@ -70,7 +89,10 @@ func InitWorkerService( idx++ } - registerRoutesExternally(routes, externalRouter, internalRouter, authMiddleware) + // Register routes externally + for _, route := range routes { + externalRouter.Path(route).Methods("GET", "POST").Handler(externalMiddleware.Wrap(internalRouter)) + } //If no frontend or scheduler address has been configured, then there is no place for the //querier worker to request work from, so no need to start a worker service @@ -107,16 +129,7 @@ func InitWorkerService( return "internalQuerier" })) - // If queries are processed using the external HTTP Server, we need wrap the internal querier with - // HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the - // request context, as well as make sure any x-www-url-formencoded params are correctly parsed - httpMiddleware := middleware.Merge( - serverutil.RecoveryHTTPMiddleware, - authMiddleware, - serverutil.NewPrepopulateMiddleware(), - ) - - internalHandler = httpMiddleware.Wrap(internalHandler) + internalHandler = internalMiddleware.Wrap(internalHandler) //Querier worker's max concurrent requests must be the same as the querier setting (*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent @@ -131,19 +144,6 @@ func InitWorkerService( prometheus.DefaultRegisterer) } -func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) { - httpMiddleware := middleware.Merge( - serverutil.RecoveryHTTPMiddleware, - authMiddleware, - serverutil.NewPrepopulateMiddleware(), - serverutil.ResponseJSONMiddleware(), - ) - - for _, route := range routes { - externalRouter.Path(route).Methods("GET", "POST").Handler(httpMiddleware.Wrap(internalHandler)) - } -} - func querierRunningStandalone(cfg WorkerServiceConfig) bool { runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.ReadEnabled && !cfg.AllEnabled level.Debug(util_log.Logger).Log( diff --git a/pkg/querier/worker_service_test.go b/pkg/querier/worker_service_test.go index 39abb3c30840..3eadfc82e1e9 100644 --- a/pkg/querier/worker_service_test.go +++ b/pkg/querier/worker_service_test.go @@ -22,6 +22,13 @@ func Test_InitQuerierService(t *testing.T) { }), } + var alwaysExternalHandlers = map[string]http.Handler{ + "/loki/api/v1/tail": http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte("test tail handler")) + require.NoError(t, err) + }), + } + testContext := func(config WorkerServiceConfig, authMiddleware middleware.Interface) (*mux.Router, services.Service) { externalRouter := mux.NewRouter() @@ -32,6 +39,7 @@ func Test_InitQuerierService(t *testing.T) { querierWorkerService, err := InitWorkerService( config, mockQueryHandlers, + alwaysExternalHandlers, externalRouter, http.HandlerFunc(externalRouter.ServeHTTP), authMiddleware, @@ -57,6 +65,13 @@ func Test_InitQuerierService(t *testing.T) { externalRouter.ServeHTTP(recorder, request) assert.Equal(t, 200, recorder.Code) assert.Equal(t, "test handler", recorder.Body.String()) + + // Tail endpoints always external + recorder = httptest.NewRecorder() + request = httptest.NewRequest("GET", "/loki/api/v1/tail", nil) + externalRouter.ServeHTTP(recorder, request) + assert.Equal(t, 200, recorder.Code) + assert.Equal(t, "test tail handler", recorder.Body.String()) }) t.Run("wrap external handler with auth middleware", func(t *testing.T) { @@ -187,6 +202,13 @@ func Test_InitQuerierService(t *testing.T) { request := httptest.NewRequest("GET", "/loki/api/v1/query", nil) externalRouter.ServeHTTP(recorder, request) assert.Equal(t, 404, recorder.Code) + + // Tail endpoints always external + recorder = httptest.NewRecorder() + request = httptest.NewRequest("GET", "/loki/api/v1/tail", nil) + externalRouter.ServeHTTP(recorder, request) + assert.Equal(t, 200, recorder.Code) + assert.Equal(t, "test tail handler", recorder.Body.String()) } })