From d0c3d0e782f72b4baf917bdafd45365e95724f98 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Sat, 13 Jun 2020 10:57:02 +0530 Subject: [PATCH 1/2] query tee proxy with support for comparison of responses --- cmd/querytee/main.go | 77 +++++++++++++++++ cmd/querytee/response_comparator.go | 69 +++++++++++++++ cmd/querytee/response_comparator_test.go | 102 +++++++++++++++++++++++ 3 files changed, 248 insertions(+) create mode 100644 cmd/querytee/main.go create mode 100644 cmd/querytee/response_comparator.go create mode 100644 cmd/querytee/response_comparator_test.go diff --git a/cmd/querytee/main.go b/cmd/querytee/main.go new file mode 100644 index 000000000000..0ea26142f238 --- /dev/null +++ b/cmd/querytee/main.go @@ -0,0 +1,77 @@ +package main + +import ( + "flag" + "os" + + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/logging" + "github.com/weaveworks/common/server" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/tools/querytee" + + "github.com/grafana/loki/pkg/loghttp" +) + +type Config struct { + ServerMetricsPort int + LogLevel logging.Level + ProxyConfig querytee.ProxyConfig +} + +func main() { + // Parse CLI flags. + cfg := Config{} + flag.IntVar(&cfg.ServerMetricsPort, "server.metrics-port", 9900, "The port where metrics are exposed.") + cfg.LogLevel.RegisterFlags(flag.CommandLine) + cfg.ProxyConfig.RegisterFlags(flag.CommandLine) + flag.Parse() + + util.InitLogger(&server.Config{ + LogLevel: cfg.LogLevel, + }) + + // Run the instrumentation server. + registry := prometheus.NewRegistry() + registry.MustRegister(prometheus.NewGoCollector()) + + i := querytee.NewInstrumentationServer(cfg.ServerMetricsPort, registry) + if err := i.Start(); err != nil { + level.Error(util.Logger).Log("msg", "Unable to start instrumentation server", "err", err.Error()) + os.Exit(1) + } + + // Run the proxy. + proxy, err := querytee.NewProxy(cfg.ProxyConfig, util.Logger, lokiReadRoutes(), registry) + if err != nil { + level.Error(util.Logger).Log("msg", "Unable to initialize the proxy", "err", err.Error()) + os.Exit(1) + } + + if err := proxy.Start(); err != nil { + level.Error(util.Logger).Log("msg", "Unable to start the proxy", "err", err.Error()) + os.Exit(1) + } + + proxy.Await() +} + +func lokiReadRoutes() []querytee.Route { + samplesComparator := querytee.NewSamplesComparator() + samplesComparator.RegisterSamplesType(loghttp.ResultTypeStream, compareStreams) + + return []querytee.Route{ + {Path: "/loki/api/v1/query_range", RouteName: "api_v1_query_range", Methods: "GET", ResponseComparator: samplesComparator}, + {Path: "/loki/api/v1/query", RouteName: "api_v1_query", Methods: "GET", ResponseComparator: samplesComparator}, + {Path: "/loki/api/v1/label", RouteName: "api_v1_label", Methods: "GET", ResponseComparator: nil}, + {Path: "/loki/api/v1/labels", RouteName: "api_v1_labels", Methods: "GET", ResponseComparator: nil}, + {Path: "/loki/api/v1/label/{name}/values", RouteName: "api_v1_label_name_values", Methods: "GET", ResponseComparator: nil}, + {Path: "/loki/api/v1/series", RouteName: "api_v1_series", Methods: "GET", ResponseComparator: nil}, + {Path: "/api/prom/query", RouteName: "api_prom_query", Methods: "GET", ResponseComparator: samplesComparator}, + {Path: "/api/prom/label", RouteName: "api_prom_label", Methods: "GET", ResponseComparator: nil}, + {Path: "/api/prom/label/{name}/values", RouteName: "api_prom_label_name_values", Methods: "GET", ResponseComparator: nil}, + {Path: "/api/prom/series", RouteName: "api_prom_series", Methods: "GET", ResponseComparator: nil}, + } +} diff --git a/cmd/querytee/response_comparator.go b/cmd/querytee/response_comparator.go new file mode 100644 index 000000000000..08bcd7ef0cf8 --- /dev/null +++ b/cmd/querytee/response_comparator.go @@ -0,0 +1,69 @@ +package main + +import ( + "encoding/json" + "fmt" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + + "github.com/grafana/loki/pkg/loghttp" +) + +func compareStreams(expectedRaw, actualRaw json.RawMessage) error { + var expected, actual loghttp.Streams + + err := json.Unmarshal(expectedRaw, &expected) + if err != nil { + return err + } + err = json.Unmarshal(actualRaw, &actual) + if err != nil { + return err + } + + if len(expected) != len(actual) { + return fmt.Errorf("expected %d streams but got %d", len(expected), len(actual)) + } + + streamLabelsToIndexMap := make(map[string]int, len(expected)) + for i, actualStream := range actual { + streamLabelsToIndexMap[actualStream.Labels.String()] = i + } + + for _, expectedStream := range expected { + actualStreamIndex, ok := streamLabelsToIndexMap[expectedStream.Labels.String()] + if !ok { + return fmt.Errorf("expected stream %s missing from actual response", expectedStream.Labels) + } + + actualStream := actual[actualStreamIndex] + expectedValuesLen := len(expectedStream.Entries) + actualValuesLen := len(actualStream.Entries) + + if expectedValuesLen != actualValuesLen { + err := fmt.Errorf("expected %d values for stream %s but got %d", expectedValuesLen, + expectedStream.Labels, actualValuesLen) + if expectedValuesLen > 0 && actualValuesLen > 0 { + level.Error(util.Logger).Log("msg", err.Error(), "oldest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), + "newest-expected-ts", expectedStream.Entries[expectedValuesLen-1].Timestamp.UnixNano(), + "oldest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), "newest-actual-ts", actualStream.Entries[actualValuesLen-1].Timestamp.UnixNano()) + } + return err + } + + for i, expectedSamplePair := range expectedStream.Entries { + actualSamplePair := actualStream.Entries[i] + if !expectedSamplePair.Timestamp.Equal(actualSamplePair.Timestamp) { + return fmt.Errorf("expected timestamp %v but got %v for stream %s", expectedSamplePair.Timestamp.UnixNano(), + actualSamplePair.Timestamp.UnixNano(), expectedStream.Labels) + } + if expectedSamplePair.Line != actualSamplePair.Line { + return fmt.Errorf("expected line %s for timestamp %v but got %s for stream %s", expectedSamplePair.Line, + expectedSamplePair.Timestamp.UnixNano(), actualSamplePair.Line, expectedStream.Labels) + } + } + } + + return nil +} diff --git a/cmd/querytee/response_comparator_test.go b/cmd/querytee/response_comparator_test.go new file mode 100644 index 000000000000..f1f0da33de87 --- /dev/null +++ b/cmd/querytee/response_comparator_test.go @@ -0,0 +1,102 @@ +package main + +import ( + "encoding/json" + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCompareStreams(t *testing.T) { + for _, tc := range []struct { + name string + expected json.RawMessage + actual json.RawMessage + err error + }{ + { + name: "no streams", + expected: json.RawMessage(`[]`), + actual: json.RawMessage(`[]`), + }, + { + name: "no streams in actual response", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"]]} + ]`), + actual: json.RawMessage(`[]`), + err: errors.New("expected 1 streams but got 0"), + }, + { + name: "extra stream in actual response", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"]]}, + {"stream":{"foo1":"bar1"},"values":[["1","1"]]} + ]`), + err: errors.New("expected 1 streams but got 2"), + }, + { + name: "same number of streams but with different labels", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo1":"bar1"},"values":[["1","1"]]} + ]`), + err: errors.New("expected stream {foo=\"bar\"} missing from actual response"), + }, + { + name: "difference in number of samples", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"],["2","2"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"]]} + ]`), + err: errors.New("expected 2 values for stream {foo=\"bar\"} but got 1"), + }, + { + name: "difference in sample timestamp", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"],["2","2"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"],["3","2"]]} + ]`), + err: errors.New("expected timestamp 2 but got 3 for stream {foo=\"bar\"}"), + }, + { + name: "difference in sample value", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"],["2","2"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"],["2","3"]]} + ]`), + err: errors.New("expected line 2 for timestamp 2 but got 3 for stream {foo=\"bar\"}"), + }, + { + name: "correct samples", + expected: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"],["2","2"]]} + ]`), + actual: json.RawMessage(`[ + {"stream":{"foo":"bar"},"values":[["1","1"],["2","2"]]} + ]`), + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := compareStreams(tc.expected, tc.actual) + if tc.err == nil { + require.NoError(t, err) + return + } + require.Error(t, err) + require.Equal(t, tc.err.Error(), err.Error()) + }) + } +} From 517222b1ed444b976dced591bd08129283e413b6 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Tue, 16 Jun 2020 12:30:08 +0530 Subject: [PATCH 2/2] update modules --- .../cortexproject/cortex/tools/LICENSE | 13 ++ .../cortex/tools/querytee/instrumentation.go | 60 ++++++ .../cortex/tools/querytee/proxy.go | 184 ++++++++++++++++ .../cortex/tools/querytee/proxy_backend.go | 112 ++++++++++ .../cortex/tools/querytee/proxy_endpoint.go | 180 ++++++++++++++++ .../cortex/tools/querytee/proxy_metrics.go | 35 +++ .../tools/querytee/response_comparator.go | 201 ++++++++++++++++++ vendor/modules.txt | 1 + 8 files changed, 786 insertions(+) create mode 100644 vendor/github.com/cortexproject/cortex/tools/LICENSE create mode 100644 vendor/github.com/cortexproject/cortex/tools/querytee/instrumentation.go create mode 100644 vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go create mode 100644 vendor/github.com/cortexproject/cortex/tools/querytee/proxy_backend.go create mode 100644 vendor/github.com/cortexproject/cortex/tools/querytee/proxy_endpoint.go create mode 100644 vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go create mode 100644 vendor/github.com/cortexproject/cortex/tools/querytee/response_comparator.go diff --git a/vendor/github.com/cortexproject/cortex/tools/LICENSE b/vendor/github.com/cortexproject/cortex/tools/LICENSE new file mode 100644 index 000000000000..9cd1640bad17 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/tools/LICENSE @@ -0,0 +1,13 @@ +Copyright 2018 Weaveworks. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/instrumentation.go b/vendor/github.com/cortexproject/cortex/tools/querytee/instrumentation.go new file mode 100644 index 000000000000..8b6267786a77 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/instrumentation.go @@ -0,0 +1,60 @@ +package querytee + +import ( + "fmt" + "net" + "net/http" + + "github.com/go-kit/kit/log/level" + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/cortexproject/cortex/pkg/util" +) + +type InstrumentationServer struct { + port int + registry *prometheus.Registry + srv *http.Server +} + +// NewInstrumentationServer returns a server exposing Prometheus metrics. +func NewInstrumentationServer(port int, registry *prometheus.Registry) *InstrumentationServer { + return &InstrumentationServer{ + port: port, + registry: registry, + } +} + +// Start the instrumentation server. +func (s *InstrumentationServer) Start() error { + // Setup listener first, so we can fail early if the port is in use. + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port)) + if err != nil { + return err + } + + router := mux.NewRouter() + router.Handle("/metrics", promhttp.HandlerFor(s.registry, promhttp.HandlerOpts{})) + + s.srv = &http.Server{ + Handler: router, + } + + go func() { + if err := s.srv.Serve(listener); err != nil { + level.Error(util.Logger).Log("msg", "metrics server terminated", "err", err) + } + }() + + return nil +} + +// Stop closes the instrumentation server. +func (s *InstrumentationServer) Stop() { + if s.srv != nil { + s.srv.Close() + s.srv = nil + } +} diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go new file mode 100644 index 000000000000..c5013fff8bc1 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go @@ -0,0 +1,184 @@ +package querytee + +import ( + "context" + "flag" + "fmt" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/gorilla/mux" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + errMinBackends = errors.New("at least 1 backend is required") +) + +type ProxyConfig struct { + ServerServicePort int + BackendEndpoints string + PreferredBackend string + BackendReadTimeout time.Duration + CompareResponses bool +} + +func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.ServerServicePort, "server.service-port", 80, "The port where the query-tee service listens to.") + f.StringVar(&cfg.BackendEndpoints, "backend.endpoints", "", "Comma separated list of backend endpoints to query.") + f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client.") + f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.") + f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.") +} + +type Route struct { + Path string + RouteName string + Methods string + ResponseComparator ResponsesComparator +} + +type Proxy struct { + cfg ProxyConfig + backends []*ProxyBackend + logger log.Logger + metrics *ProxyMetrics + routes []Route + + // The HTTP server used to run the proxy service. + srv *http.Server + srvListener net.Listener + + // Wait group used to wait until the server has done. + done sync.WaitGroup +} + +func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer prometheus.Registerer) (*Proxy, error) { + if cfg.CompareResponses && cfg.PreferredBackend == "" { + return nil, fmt.Errorf("when enabling comparion of results -backend.preferred flag must be set to hostname of preferred backend") + } + + p := &Proxy{ + cfg: cfg, + logger: logger, + metrics: NewProxyMetrics(registerer), + routes: routes, + } + + // Parse the backend endpoints (comma separated). + parts := strings.Split(cfg.BackendEndpoints, ",") + + for idx, part := range parts { + // Skip empty ones. + part = strings.TrimSpace(part) + if part == "" { + continue + } + + u, err := url.Parse(part) + if err != nil { + return nil, errors.Wrapf(err, "invalid backend endpoint %s", part) + } + + // The backend name is hardcoded as the backend hostname. + name := u.Hostname() + preferred := name == cfg.PreferredBackend + + // In tests we have the same hostname for all backends, so we also + // support a numeric preferred backend which is the index in the list + // of backends. + if preferredIdx, err := strconv.Atoi(cfg.PreferredBackend); err == nil { + preferred = preferredIdx == idx + } + + p.backends = append(p.backends, NewProxyBackend(name, u, cfg.BackendReadTimeout, preferred)) + } + + // At least 1 backend is required + if len(p.backends) < 1 { + return nil, errMinBackends + } + + if cfg.CompareResponses && len(p.backends) != 2 { + return nil, fmt.Errorf("when enabling comparison of results number of backends should be 2 exactly") + } + + // At least 2 backends are suggested + if len(p.backends) < 2 { + level.Warn(p.logger).Log("msg", "The proxy is running with only 1 backend. At least 2 backends are required to fulfil the purpose of the proxy and compare results.") + } + + return p, nil +} + +func (p *Proxy) Start() error { + // Setup listener first, so we can fail early if the port is in use. + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", p.cfg.ServerServicePort)) + if err != nil { + return err + } + + router := mux.NewRouter() + + // Health check endpoint. + router.Path("/").Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + // register routes + var comparator ResponsesComparator + for _, route := range p.routes { + if p.cfg.CompareResponses { + comparator = route.ResponseComparator + } + router.Path(route.Path).Methods(route.Methods).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator)) + } + + p.srvListener = listener + p.srv = &http.Server{ + ReadTimeout: 1 * time.Minute, + WriteTimeout: 2 * time.Minute, + Handler: router, + } + + // Run in a dedicated goroutine. + p.done.Add(1) + go func() { + defer p.done.Done() + + if err := p.srv.Serve(p.srvListener); err != nil { + level.Error(p.logger).Log("msg", "Proxy server failed", "err", err) + } + }() + + return nil +} + +func (p *Proxy) Stop() error { + if p.srv == nil { + return nil + } + + return p.srv.Shutdown(context.Background()) +} + +func (p *Proxy) Await() { + // Wait until terminated. + p.done.Wait() +} + +func (p *Proxy) Endpoint() string { + if p.srvListener == nil { + return "" + } + + return p.srvListener.Addr().String() +} diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_backend.go b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_backend.go new file mode 100644 index 000000000000..de4ad318fbf5 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_backend.go @@ -0,0 +1,112 @@ +package querytee + +import ( + "context" + "io/ioutil" + "net" + "net/http" + "net/url" + "path" + "time" + + "github.com/pkg/errors" +) + +// ProxyBackend holds the information of a single backend. +type ProxyBackend struct { + name string + endpoint *url.URL + client *http.Client + timeout time.Duration + + // Whether this is the preferred backend from which picking up + // the response and sending it back to the client. + preferred bool +} + +// NewProxyBackend makes a new ProxyBackend +func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, preferred bool) *ProxyBackend { + return &ProxyBackend{ + name: name, + endpoint: endpoint, + timeout: timeout, + preferred: preferred, + client: &http.Client{ + CheckRedirect: func(_ *http.Request, _ []*http.Request) error { + return errors.New("the query-tee proxy does not follow redirects") + }, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, // see https://github.com/golang/go/issues/13801 + IdleConnTimeout: 90 * time.Second, + }, + }, + } +} + +func (b *ProxyBackend) ForwardRequest(orig *http.Request) (int, []byte, error) { + req, err := b.createBackendRequest(orig) + if err != nil { + return 0, nil, err + } + + return b.doBackendRequest(req) +} + +func (b *ProxyBackend) createBackendRequest(orig *http.Request) (*http.Request, error) { + req, err := http.NewRequest(orig.Method, orig.URL.String(), nil) + if err != nil { + return nil, err + } + + // Replace the endpoint with the backend one. + req.URL.Scheme = b.endpoint.Scheme + req.URL.Host = b.endpoint.Host + + // Prepend the endpoint path to the request path. + req.URL.Path = path.Join(b.endpoint.Path, req.URL.Path) + + // Replace the auth: + // - If the endpoint has user and password, use it. + // - If the endpoint has user only, keep it and use the request password (if any). + // - If the endpoint has no user and no password, use the request auth (if any). + clientUser, clientPass, clientAuth := orig.BasicAuth() + endpointUser := b.endpoint.User.Username() + endpointPass, _ := b.endpoint.User.Password() + + if endpointUser != "" && endpointPass != "" { + req.SetBasicAuth(endpointUser, endpointPass) + } else if endpointUser != "" { + req.SetBasicAuth(endpointUser, clientPass) + } else if clientAuth { + req.SetBasicAuth(clientUser, clientPass) + } + + return req, nil +} + +func (b *ProxyBackend) doBackendRequest(req *http.Request) (int, []byte, error) { + // Honor the read timeout. + ctx, cancel := context.WithTimeout(context.Background(), b.timeout) + defer cancel() + + // Execute the request. + res, err := b.client.Do(req.WithContext(ctx)) + if err != nil { + return 0, nil, errors.Wrap(err, "executing backend request") + } + + // Read the entire response body. + defer res.Body.Close() + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return 0, nil, errors.Wrap(err, "reading backend response") + } + + return res.StatusCode, body, nil +} diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_endpoint.go b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_endpoint.go new file mode 100644 index 000000000000..91121f35e7a5 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_endpoint.go @@ -0,0 +1,180 @@ +package querytee + +import ( + "fmt" + "net/http" + "strconv" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/util" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" +) + +type ResponsesComparator interface { + Compare(expected, actual []byte) error +} + +type ProxyEndpoint struct { + backends []*ProxyBackend + metrics *ProxyMetrics + logger log.Logger + comparator ResponsesComparator + + // The route name used to track metrics. + routeName string +} + +func NewProxyEndpoint(backends []*ProxyBackend, routeName string, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator) *ProxyEndpoint { + return &ProxyEndpoint{ + backends: backends, + routeName: routeName, + metrics: metrics, + logger: logger, + comparator: comparator, + } +} + +func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { + level.Debug(p.logger).Log("msg", "Received request", "path", r.URL.Path, "query", r.URL.RawQuery) + + // Send the same request to all backends. + wg := sync.WaitGroup{} + wg.Add(len(p.backends)) + resCh := make(chan *backendResponse, len(p.backends)) + + for _, b := range p.backends { + b := b + + go func() { + defer wg.Done() + + start := time.Now() + status, body, err := b.ForwardRequest(r) + elapsed := time.Since(start) + + res := &backendResponse{ + backend: b, + status: status, + body: body, + err: err, + elapsed: elapsed, + } + resCh <- res + + // Log with a level based on the backend response. + lvl := level.Debug + if !res.succeeded() { + lvl = level.Warn + } + + lvl(p.logger).Log("msg", "Backend response", "path", r.URL.Path, "query", r.URL.RawQuery, "backend", b.name, "status", status, "elapsed", elapsed) + }() + } + + // Wait until all backend requests completed. + wg.Wait() + close(resCh) + + // Collect all responses and track metrics for each of them. + responses := make([]*backendResponse, 0, len(p.backends)) + for res := range resCh { + responses = append(responses, res) + + p.metrics.durationMetric.WithLabelValues(res.backend.name, r.Method, p.routeName, strconv.Itoa(res.statusCode())).Observe(res.elapsed.Seconds()) + } + + // Select the response to send back to the client. + downstreamRes := p.pickResponseForDownstream(responses) + if downstreamRes.err != nil { + http.Error(w, downstreamRes.err.Error(), http.StatusInternalServerError) + } else { + w.WriteHeader(downstreamRes.status) + if _, err := w.Write(downstreamRes.body); err != nil { + level.Warn(p.logger).Log("msg", "Unable to write response", "err", err) + } + } + + if p.comparator != nil { + go func() { + expectedResponse := responses[0] + actualResponse := responses[1] + if responses[1].backend.preferred { + expectedResponse, actualResponse = actualResponse, expectedResponse + } + + result := resultSuccess + err := p.compareResponses(expectedResponse, actualResponse) + if err != nil { + level.Error(util.Logger).Log("msg", "response comparison failed", "route-name", p.routeName, + "query", r.URL.RawQuery, "err", err) + result = resultFailed + } + + p.metrics.responsesComparedTotal.WithLabelValues(p.routeName, result).Inc() + }() + } +} + +func (p *ProxyEndpoint) pickResponseForDownstream(responses []*backendResponse) *backendResponse { + // Look for a successful response from the preferred backend. + for _, res := range responses { + if res.backend.preferred && res.succeeded() { + return res + } + } + + // Look for any other successful response. + for _, res := range responses { + if res.succeeded() { + return res + } + } + + // No successful response, so let's pick the first one. + return responses[0] +} + +func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse) error { + // compare response body only if we get a 200 + if expectedResponse.status != 200 { + return fmt.Errorf("skipped comparison of response because we got status code %d from preferred backend's response", expectedResponse.status) + } + + if actualResponse.status != 200 { + return fmt.Errorf("skipped comparison of response because we got status code %d from secondary backend's response", expectedResponse.status) + } + + if expectedResponse.status != actualResponse.status { + return fmt.Errorf("expected status code %d but got %d", expectedResponse.status, actualResponse.status) + } + + return p.comparator.Compare(expectedResponse.body, actualResponse.body) +} + +type backendResponse struct { + backend *ProxyBackend + status int + body []byte + err error + elapsed time.Duration +} + +func (r *backendResponse) succeeded() bool { + if r.err != nil { + return false + } + + // We consider the response successful if it's a 2xx or 4xx (but not 429). + return (r.status >= 200 && r.status < 300) || (r.status >= 400 && r.status < 500 && r.status != 429) +} + +func (r *backendResponse) statusCode() int { + if r.err != nil || r.status <= 0 { + return 500 + } + + return r.status +} diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go new file mode 100644 index 000000000000..26bef1e69d35 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go @@ -0,0 +1,35 @@ +package querytee + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/instrument" +) + +const ( + resultSuccess = "success" + resultFailed = "fail" +) + +type ProxyMetrics struct { + durationMetric *prometheus.HistogramVec + responsesComparedTotal *prometheus.CounterVec +} + +func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics { + m := &ProxyMetrics{ + durationMetric: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex_querytee", + Name: "request_duration_seconds", + Help: "Time (in seconds) spent serving HTTP requests.", + Buckets: instrument.DefBuckets, + }, []string{"backend", "method", "route", "status_code"}), + responsesComparedTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex_querytee", + Name: "responses_compared_total", + Help: "Total number of responses compared per route name by result", + }, []string{"route_name", "result"}), + } + + return m +} diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/response_comparator.go b/vendor/github.com/cortexproject/cortex/tools/querytee/response_comparator.go new file mode 100644 index 000000000000..025f47e6b545 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/response_comparator.go @@ -0,0 +1,201 @@ +package querytee + +import ( + "encoding/json" + "fmt" + + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + + "github.com/cortexproject/cortex/pkg/util" +) + +// SamplesComparatorFunc helps with comparing different types of samples coming from /api/v1/query and /api/v1/query_range routes. +type SamplesComparatorFunc func(expected, actual json.RawMessage) error + +type SamplesResponse struct { + Status string + Data struct { + ResultType string + Result json.RawMessage + } +} + +func NewSamplesComparator() *SamplesComparator { + return &SamplesComparator{map[string]SamplesComparatorFunc{ + "matrix": compareMatrix, + "vector": compareVector, + "scalar": compareScalar, + }} +} + +type SamplesComparator struct { + sampleTypesComparator map[string]SamplesComparatorFunc +} + +// RegisterSamplesComparator helps with registering custom sample types +func (s *SamplesComparator) RegisterSamplesType(samplesType string, comparator SamplesComparatorFunc) { + s.sampleTypesComparator[samplesType] = comparator +} + +func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) error { + var expected, actual SamplesResponse + + err := json.Unmarshal(expectedResponse, &expected) + if err != nil { + return err + } + + err = json.Unmarshal(actualResponse, &actual) + if err != nil { + return err + } + + if expected.Status != actual.Status { + return fmt.Errorf("expected status %s but got %s", expected.Status, actual.Status) + } + + if expected.Data.ResultType != actual.Data.ResultType { + return fmt.Errorf("expected resultType %s but got %s", expected.Data.ResultType, actual.Data.ResultType) + } + + comparator, ok := s.sampleTypesComparator[expected.Data.ResultType] + if !ok { + return fmt.Errorf("resultType %s not registered for comparison", expected.Data.ResultType) + } + + return comparator(expected.Data.Result, actual.Data.Result) +} + +func compareMatrix(expectedRaw, actualRaw json.RawMessage) error { + var expected, actual model.Matrix + + err := json.Unmarshal(expectedRaw, &expected) + if err != nil { + return err + } + err = json.Unmarshal(actualRaw, &actual) + if err != nil { + return err + } + + if len(expected) != len(actual) { + return fmt.Errorf("expected %d metrics but got %d", len(expected), + len(actual)) + } + + metricFingerprintToIndexMap := make(map[model.Fingerprint]int, len(expected)) + for i, actualMetric := range actual { + metricFingerprintToIndexMap[actualMetric.Metric.Fingerprint()] = i + } + + for _, expectedMetric := range expected { + actualMetricIndex, ok := metricFingerprintToIndexMap[expectedMetric.Metric.Fingerprint()] + if !ok { + return fmt.Errorf("expected metric %s missing from actual response", expectedMetric.Metric) + } + + actualMetric := actual[actualMetricIndex] + expectedMetricLen := len(expectedMetric.Values) + actualMetricLen := len(actualMetric.Values) + + if expectedMetricLen != actualMetricLen { + err := fmt.Errorf("expected %d samples for metric %s but got %d", expectedMetricLen, + expectedMetric.Metric, actualMetricLen) + if expectedMetricLen > 0 && actualMetricLen > 0 { + level.Error(util.Logger).Log("msg", err.Error(), "oldest-expected-ts", expectedMetric.Values[0].Timestamp, + "newest-expected-ts", expectedMetric.Values[expectedMetricLen-1].Timestamp, + "oldest-actual-ts", actualMetric.Values[0].Timestamp, "newest-actual-ts", actualMetric.Values[actualMetricLen-1].Timestamp) + } + return err + } + + for i, expectedSamplePair := range expectedMetric.Values { + actualSamplePair := actualMetric.Values[i] + err := compareSamplePair(expectedSamplePair, actualSamplePair) + if err != nil { + return errors.Wrapf(err, "sample pair not matching for metric %s", expectedMetric.Metric) + } + } + } + + return nil +} + +func compareVector(expectedRaw, actualRaw json.RawMessage) error { + var expected, actual model.Vector + + err := json.Unmarshal(expectedRaw, &expected) + if err != nil { + return err + } + + err = json.Unmarshal(actualRaw, &actual) + if err != nil { + return err + } + + if len(expected) != len(actual) { + return fmt.Errorf("expected %d metrics but got %d", len(expected), + len(actual)) + } + + metricFingerprintToIndexMap := make(map[model.Fingerprint]int, len(expected)) + for i, actualMetric := range actual { + metricFingerprintToIndexMap[actualMetric.Metric.Fingerprint()] = i + } + + for _, expectedMetric := range expected { + actualMetricIndex, ok := metricFingerprintToIndexMap[expectedMetric.Metric.Fingerprint()] + if !ok { + return fmt.Errorf("expected metric %s missing from actual response", expectedMetric.Metric) + } + + actualMetric := actual[actualMetricIndex] + err := compareSamplePair(model.SamplePair{ + Timestamp: expectedMetric.Timestamp, + Value: expectedMetric.Value, + }, model.SamplePair{ + Timestamp: actualMetric.Timestamp, + Value: actualMetric.Value, + }) + if err != nil { + return errors.Wrapf(err, "sample pair not matching for metric %s", expectedMetric.Metric) + } + } + + return nil +} + +func compareScalar(expectedRaw, actualRaw json.RawMessage) error { + var expected, actual model.Scalar + err := json.Unmarshal(expectedRaw, &expected) + if err != nil { + return err + } + + err = json.Unmarshal(actualRaw, &actual) + if err != nil { + return err + } + + return compareSamplePair(model.SamplePair{ + Timestamp: expected.Timestamp, + Value: expected.Value, + }, model.SamplePair{ + Timestamp: actual.Timestamp, + Value: actual.Value, + }) +} + +func compareSamplePair(expected, actual model.SamplePair) error { + if expected.Timestamp != actual.Timestamp { + return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp) + } + if expected.Value != actual.Value { + return fmt.Errorf("expected value %s for timestamp %v but got %s", expected.Value, expected.Timestamp, actual.Value) + } + + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index df4797ca1d7f..874ceba742db 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -223,6 +223,7 @@ github.com/cortexproject/cortex/pkg/util/spanlogger github.com/cortexproject/cortex/pkg/util/test github.com/cortexproject/cortex/pkg/util/tls github.com/cortexproject/cortex/pkg/util/validation +github.com/cortexproject/cortex/tools/querytee # github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew/spew # github.com/dgrijalva/jwt-go v3.2.0+incompatible