diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 25cdabb6c..ec16b6023 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -13,6 +13,10 @@ import ( "github.com/weaveworks/flagger/pkg/router" ) +const ( + MetricsProviderServiceSuffix = ":service" +) + // scheduleCanaries synchronises the canary map with the jobs map, // for new canaries new jobs are created and started // for the removed canaries the jobs are stopped and deleted @@ -747,10 +751,19 @@ func (c *Controller) analyseCanary(r *flaggerv1.Canary) bool { if r.Spec.Provider != "" { metricsProvider = r.Spec.Provider - // set the metrics server to Linkerd Prometheus when Linkerd is the default mesh provider + // set the metrics provider to Linkerd Prometheus when Linkerd is the default mesh provider if strings.Contains(c.meshProvider, "linkerd") { metricsProvider = "linkerd" } + + // set the metrics provider to Envoy Prometheus when Envoy is the default mesh provider + if strings.Contains(c.meshProvider, "envoy") { + metricsProvider = "envoy" + } + } + // set the metrics provider to query Prometheus for the canary Kubernetes service if the canary target is Service + if r.Spec.TargetRef.Kind == "Service" { + metricsProvider = metricsProvider + MetricsProviderServiceSuffix } // create observer based on the mesh provider @@ -761,7 +774,7 @@ func (c *Controller) analyseCanary(r *flaggerv1.Canary) bool { if r.Spec.MetricsServer != "" { metricsServer = r.Spec.MetricsServer var err error - observerFactory, err = metrics.NewFactory(metricsServer, metricsProvider, 5*time.Second) + observerFactory, err = metrics.NewFactory(metricsServer, 5*time.Second) if err != nil { c.recordEventErrorf(r, "Error building Prometheus client for %s %v", r.Spec.MetricsServer, err) return false diff --git a/pkg/metrics/envoy_service.go b/pkg/metrics/envoy_service.go new file mode 100644 index 000000000..5e9d26c37 --- /dev/null +++ b/pkg/metrics/envoy_service.go @@ -0,0 +1,73 @@ +package metrics + +import ( + "time" +) + +var envoyServiceQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + envoy_cluster_upstream_rq{ + kubernetes_namespace="{{ .Namespace }}", + envoy_cluster_name="{{ .Name }}-canary", + envoy_response_code!~"5.*" + }[{{ .Interval }}] + ) + ) + / + sum( + rate( + envoy_cluster_upstream_rq{ + kubernetes_namespace="{{ .Namespace }}", + envoy_cluster_name="{{ .Name }}-canary" + }[{{ .Interval }}] + ) + ) + * 100`, + "request-duration": ` + histogram_quantile( + 0.99, + sum( + rate( + envoy_cluster_upstream_rq_time_bucket{ + kubernetes_namespace="{{ .Namespace }}", + envoy_cluster_name="{{ .Name }}-canary" + }[{{ .Interval }}] + ) + ) by (le) + )`, +} + +type EnvoyServiceObserver struct { + client *PrometheusClient +} + +func (ob *EnvoyServiceObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { + query, err := ob.client.RenderQuery(name, namespace, interval, envoyServiceQueries["request-success-rate"]) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + return value, nil +} + +func (ob *EnvoyServiceObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { + query, err := ob.client.RenderQuery(name, namespace, interval, envoyServiceQueries["request-duration"]) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + ms := time.Duration(int64(value)) * time.Millisecond + return ms, nil +} diff --git a/pkg/metrics/envoy_service_test.go b/pkg/metrics/envoy_service_test.go new file mode 100644 index 000000000..fca854bd0 --- /dev/null +++ b/pkg/metrics/envoy_service_test.go @@ -0,0 +1,74 @@ +package metrics + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestEnvoyServiceObserver_GetRequestSuccessRate(t *testing.T) { + expected := ` sum( rate( envoy_cluster_upstream_rq{ kubernetes_namespace="default", envoy_cluster_name="podinfo-canary", envoy_response_code!~"5.*" }[1m] ) ) / sum( rate( envoy_cluster_upstream_rq{ kubernetes_namespace="default", envoy_cluster_name="podinfo-canary" }[1m] ) ) * 100` + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + if promql != expected { + t.Errorf("\nGot %s \nWanted %s", promql, expected) + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) + if err != nil { + t.Fatal(err) + } + + observer := &EnvoyServiceObserver{ + client: client, + } + + val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + if err != nil { + t.Fatal(err.Error()) + } + + if val != 100 { + t.Errorf("Got %v wanted %v", val, 100) + } +} + +func TestEnvoyServiceObserver_GetRequestDuration(t *testing.T) { + expected := ` histogram_quantile( 0.99, sum( rate( envoy_cluster_upstream_rq_time_bucket{ kubernetes_namespace="default", envoy_cluster_name="podinfo-canary" }[1m] ) ) by (le) )` + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + if promql != expected { + t.Errorf("\nGot %s \nWanted %s", promql, expected) + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) + if err != nil { + t.Fatal(err) + } + + observer := &EnvoyServiceObserver{ + client: client, + } + + val, err := observer.GetRequestDuration("podinfo", "default", "1m") + if err != nil { + t.Fatal(err.Error()) + } + + if val != 100*time.Millisecond { + t.Errorf("Got %v wanted %v", val, 100*time.Millisecond) + } +} diff --git a/pkg/metrics/factory.go b/pkg/metrics/factory.go index e2b69b8d8..ce6d85d07 100644 --- a/pkg/metrics/factory.go +++ b/pkg/metrics/factory.go @@ -6,19 +6,17 @@ import ( ) type Factory struct { - MeshProvider string - Client *PrometheusClient + Client *PrometheusClient } -func NewFactory(metricsServer string, meshProvider string, timeout time.Duration) (*Factory, error) { +func NewFactory(metricsServer string, timeout time.Duration) (*Factory, error) { client, err := NewPrometheusClient(metricsServer, timeout) if err != nil { return nil, err } return &Factory{ - MeshProvider: meshProvider, - Client: client, + Client: client, }, nil } @@ -32,7 +30,7 @@ func (factory Factory) Observer(provider string) Interface { return &HttpObserver{ client: factory.Client, } - case provider == "appmesh": + case provider == "appmesh", provider == "envoy": return &EnvoyObserver{ client: factory.Client, } @@ -44,8 +42,8 @@ func (factory Factory) Observer(provider string) Interface { return &GlooObserver{ client: factory.Client, } - case provider == "smi:linkerd": - return &LinkerdObserver{ + case provider == "appmesh:service", provider == "envoy:service": + return &EnvoyServiceObserver{ client: factory.Client, } case provider == "linkerd":