From 6384ec13ec77a86c5028818299233ca2ba1ffb8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Lambert?= Date: Mon, 20 Jun 2022 13:22:00 -0400 Subject: [PATCH 1/3] Fix cooldown feature to properly manage several instances of emissary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Kévin Lambert --- CHANGELOG.md | 15 +++++++- docs/releaseNotes.yml | 17 +++++++++ pkg/agent/agent.go | 68 +++++++++++++++++++++------------ pkg/agent/agent_metrics_test.go | 28 +++++++++++--- 4 files changed, 97 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10cae51ec4..cb1be721bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,8 +80,12 @@ it will be removed; but as it won't be user-visible this isn't considered a brea - Feature: The agent is now able to parse api contracts using swagger 2, and to convert them to OpenAPI 3, making them available for use in the dev portal. +- Bugfix: A regression was introduced in 2.3.0 causing the agent to miss some of the metrics coming + from emissary ingress before sending them to Ambassador cloud. This issue has been resolved to + ensure that all the nodes composing the emissary ingress cluster are reporting properly. + ## [3.0.0] June 27, 2022 -[3.0.0]: https://github.com/emissary-ingress/emissary/compare/v2.3.1...v3.0.0 +[3.0.0]: https://github.com/emissary-ingress/emissary/compare/v2.3.2...v3.0.0 ### Emissary-ingress and Ambassador Edge Stack @@ -151,6 +155,15 @@ it will be removed; but as it won't be user-visible this isn't considered a brea HTTP/3 connections using QUIC and the UDP network protocol. It currently only supports for connections between downstream clients and Emissary-ingress. +## [2.3.2] TBD +[2.3.2]: https://github.com/emissary-ingress/emissary/compare/v2.3.1...v2.3.2 + +### Emissary-ingress and Ambassador Edge Stack + +- Bugfix: A regression was introduced in 2.3.0 causing the agent to miss some of the metrics coming + from emissary ingress before sending them to Ambassador cloud. This issue has been resolved to + ensure that all the nodes composing the emissary ingress cluster are reporting properly. + ## [2.3.1] June 09, 2022 [2.3.1]: https://github.com/emissary-ingress/emissary/compare/v2.3.0...v2.3.1 diff --git a/docs/releaseNotes.yml b/docs/releaseNotes.yml index 338c670e37..82b1bc705e 100644 --- a/docs/releaseNotes.yml +++ b/docs/releaseNotes.yml @@ -41,6 +41,13 @@ items: The agent is now able to parse api contracts using swagger 2, and to convert them to OpenAPI 3, making them available for use in the dev portal. + - title: fix regression in the agent for the metrics transfer. + type: bugfix + body: >- + A regression was introduced in 2.3.0 causing the agent to miss some of the metrics coming from + emissary ingress before sending them to Ambassador cloud. This issue has been resolved to ensure + that all the nodes composing the emissary ingress cluster are reporting properly. + - version: 3.0.0 date: '2022-06-27' notes: @@ -134,6 +141,16 @@ items: With the ugprade to Envoy 1.22, $productName$ can now be configured to listen for HTTP/3 connections using QUIC and the UDP network protocol. It currently only supports for connections between downstream clients and $productName$. + - version: 2.3.2 + date: 'TBD' + notes: + - title: fix regression in the agent for the metrics transfer. + type: bugfix + body: >- + A regression was introduced in 2.3.0 causing the agent to miss some of the metrics coming from + emissary ingress before sending them to Ambassador cloud. This issue has been resolved to ensure + that all the nodes composing the emissary ingress cluster are reporting properly. + - version: 2.3.1 date: '2022-06-09' notes: diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 8c90165fb6..9ad17b9d7e 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "io/ioutil" "net/http" "net/url" @@ -14,6 +15,7 @@ import ( "time" io_prometheus_client "github.com/prometheus/client_model/go" + "google.golang.org/grpc/peer" "google.golang.org/protobuf/types/known/timestamppb" "k8s.io/apimachinery/pkg/runtime/schema" @@ -114,6 +116,9 @@ type Agent struct { // Cloud API. metricsBackoffUntil time.Time + // Used to accumulate metrics for a same timestamp before pushing them to the cloud. + metricStack map[string][]*io_prometheus_client.MetricFamily + // Extra headers to inject into RPC requests to ambassador cloud. rpcExtraHeaders []string @@ -181,8 +186,9 @@ func NewAgent(directiveHandler DirectiveHandler, rolloutsGetterFactory rolloutsG directiveHandler: directiveHandler, reportRunning: atomicBool{value: false}, agentWatchFieldSelector: getEnvWithDefault("AGENT_WATCH_FIELD_SELECTOR", "metadata.namespace!=kube-system"), - metricsBackoffUntil: time.Now(), + metricsBackoffUntil: time.Now().Add(defaultMinReportPeriod), rpcExtraHeaders: rpcExtraHeaders, + metricStack: map[string][]*io_prometheus_client.MetricFamily{}, } } @@ -809,56 +815,70 @@ var allowedMetricsSuffixes = []string{"upstream_rq_total", "upstream_rq_time", " // MetricsRelayHandler is invoked as a callback when the agent receive metrics from Envoy (sink). func (a *Agent) MetricsRelayHandler( - logCtx context.Context, + ctx context.Context, in *envoyMetrics.StreamMetricsMessage, ) { - a.metricsRelayMutex.Lock() - defer a.metricsRelayMutex.Unlock() - metrics := in.GetEnvoyMetrics() - metricCount := len(metrics) - - if !time.Now().After(a.metricsBackoffUntil) { - dlog.Debugf(logCtx, "Drop %d metric(s); next push scheduled for %s", - metricCount, a.metricsBackoffUntil.String()) - return - } if a.comm != nil && !a.reportingStopped { - dlog.Infof(logCtx, "Received %d metric(s)", metricCount) - a.ambassadorAPIKeyMutex.Lock() apikey := a.ambassadorAPIKey a.ambassadorAPIKeyMutex.Unlock() - outMetrics := make([]*io_prometheus_client.MetricFamily, 0, len(metrics)) + newMetrics := make([]*io_prometheus_client.MetricFamily, 0, len(metrics)) for _, metricFamily := range metrics { for _, suffix := range allowedMetricsSuffixes { if strings.HasSuffix(metricFamily.GetName(), suffix) { - outMetrics = append(outMetrics, metricFamily) + newMetrics = append(newMetrics, metricFamily) break } } } - outMessage := &agent.StreamMetricsMessage{ - Identity: a.agentID, - EnvoyMetrics: outMetrics, + p, ok := peer.FromContext(ctx) + + if !ok { + dlog.Errorf(ctx, "peer not found in context") + return } - if relayedMetricCount := len(outMessage.GetEnvoyMetrics()); relayedMetricCount > 0 { + instanceID := p.Addr.String() + + a.metricsRelayMutex.Lock() + defer a.metricsRelayMutex.Unlock() + // Collect metrics until next report. + if time.Now().Before(a.metricsBackoffUntil) { + dlog.Infof(ctx, "Append %d metric(s) to stack from %s", + len(newMetrics), instanceID, + ) + a.metricStack[instanceID] = newMetrics + } else { + // Otherwise, we reached a new batch of metric, send everything. + outMessage := &agent.StreamMetricsMessage{ + Identity: a.agentID, + EnvoyMetrics: []*io_prometheus_client.MetricFamily{}, + } + + for key, instanceMetrics := range a.metricStack { + outMessage.EnvoyMetrics = append(outMessage.EnvoyMetrics, instanceMetrics...) + delete(a.metricStack, key) + } + + if relayedMetricCount := len(outMessage.GetEnvoyMetrics()); relayedMetricCount > 0 { - dlog.Infof(logCtx, "Relaying %d metric(s)", relayedMetricCount) + dlog.Infof(ctx, "Relaying %d metric(s)", relayedMetricCount) - if err := a.comm.StreamMetrics(logCtx, outMessage, apikey); err != nil { - dlog.Errorf(logCtx, "error streaming metric(s): %+v", err) + if err := a.comm.StreamMetrics(ctx, outMessage, apikey); err != nil { + dlog.Errorf(ctx, "error streaming metric(s): %+v", err) + } } + // Configure next push. a.metricsBackoffUntil = time.Now().Add(defaultMinReportPeriod) - dlog.Infof(logCtx, "Next metrics relay scheduled for %s", + dlog.Infof(ctx, "Next metrics relay scheduled for %s", a.metricsBackoffUntil.UTC().String()) } } diff --git a/pkg/agent/agent_metrics_test.go b/pkg/agent/agent_metrics_test.go index f4e26820f5..9f8e14a5ff 100644 --- a/pkg/agent/agent_metrics_test.go +++ b/pkg/agent/agent_metrics_test.go @@ -6,6 +6,8 @@ import ( envoyMetrics "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/metrics/v3" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" + "google.golang.org/grpc/peer" + "net" "testing" "time" ) @@ -46,6 +48,7 @@ func agentMetricsSetupTest() (*MockClient, *Agent) { comm: &RPCComm{ client: clientMock, }, + metricStack: map[string][]*io_prometheus_client.MetricFamily{}, } return clientMock, stubbedAgent @@ -53,26 +56,36 @@ func agentMetricsSetupTest() (*MockClient, *Agent) { func TestMetricsRelayHandler(t *testing.T) { - t.Run("will relay the metrics", func(t *testing.T) { + t.Run("will relay metrics from the stack", func(t *testing.T) { //given clientMock, stubbedAgent := agentMetricsSetupTest() - ctx := dlog.NewTestContext(t, true) + ctx := peer.NewContext(dlog.NewTestContext(t, true), &peer.Peer{ + Addr: &net.IPAddr{ + IP: net.ParseIP("192.168.0.1"), + }, + }) + stubbedAgent.metricStack["192.168.0.1"] = []*io_prometheus_client.MetricFamily{acceptedMetric} //when stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{ - Identifier: nil, + Identifier: nil, + // ignored since time to report. EnvoyMetrics: []*io_prometheus_client.MetricFamily{ignoredMetric, acceptedMetric}, }) //then assert.Equal(t, []*agent.StreamMetricsMessage{{ EnvoyMetrics: []*io_prometheus_client.MetricFamily{acceptedMetric}, - }}, clientMock.SentMetrics) + }}, clientMock.SentMetrics, "metrics should be propagated to cloud") }) t.Run("will not relay the metrics since it is in cool down period.", func(t *testing.T) { //given clientMock, stubbedAgent := agentMetricsSetupTest() - ctx := dlog.NewTestContext(t, true) + ctx := peer.NewContext(dlog.NewTestContext(t, true), &peer.Peer{ + Addr: &net.IPAddr{ + IP: net.ParseIP("192.168.0.1"), + }, + }) stubbedAgent.metricsBackoffUntil = time.Now().Add(defaultMinReportPeriod) //when @@ -82,6 +95,9 @@ func TestMetricsRelayHandler(t *testing.T) { }) //then - assert.Equal(t, 0, len(clientMock.SentMetrics)) + assert.Equal(t, stubbedAgent.metricStack["192.168.0.1"], + []*io_prometheus_client.MetricFamily{acceptedMetric}, + "metrics should be added to the stack") + assert.Equal(t, 0, len(clientMock.SentMetrics), "nothing send to cloud") }) } From c2b15be3ca30242bcd2480597a24adfd46f5863f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Lambert?= Date: Tue, 12 Jul 2022 18:22:43 -0400 Subject: [PATCH 2/3] Apply style guidelines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Kévin Lambert --- pkg/agent/agent.go | 61 +++++++++++++++++---------------- pkg/agent/agent_metrics_test.go | 6 ++-- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 9ad17b9d7e..61c88adde4 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -117,7 +117,7 @@ type Agent struct { metricsBackoffUntil time.Time // Used to accumulate metrics for a same timestamp before pushing them to the cloud. - metricStack map[string][]*io_prometheus_client.MetricFamily + aggregatedMetrics map[string][]*io_prometheus_client.MetricFamily // Extra headers to inject into RPC requests to ambassador cloud. rpcExtraHeaders []string @@ -188,7 +188,7 @@ func NewAgent(directiveHandler DirectiveHandler, rolloutsGetterFactory rolloutsG agentWatchFieldSelector: getEnvWithDefault("AGENT_WATCH_FIELD_SELECTOR", "metadata.namespace!=kube-system"), metricsBackoffUntil: time.Now().Add(defaultMinReportPeriod), rpcExtraHeaders: rpcExtraHeaders, - metricStack: map[string][]*io_prometheus_client.MetricFamily{}, + aggregatedMetrics: map[string][]*io_prometheus_client.MetricFamily{}, } } @@ -821,6 +821,12 @@ func (a *Agent) MetricsRelayHandler( metrics := in.GetEnvoyMetrics() if a.comm != nil && !a.reportingStopped { + p, ok := peer.FromContext(ctx) + + if !ok { + dlog.Errorf(ctx, "peer not found in context") + return + } a.ambassadorAPIKeyMutex.Lock() apikey := a.ambassadorAPIKey @@ -837,13 +843,6 @@ func (a *Agent) MetricsRelayHandler( } } - p, ok := peer.FromContext(ctx) - - if !ok { - dlog.Errorf(ctx, "peer not found in context") - return - } - instanceID := p.Addr.String() a.metricsRelayMutex.Lock() @@ -853,34 +852,36 @@ func (a *Agent) MetricsRelayHandler( dlog.Infof(ctx, "Append %d metric(s) to stack from %s", len(newMetrics), instanceID, ) - a.metricStack[instanceID] = newMetrics - } else { - // Otherwise, we reached a new batch of metric, send everything. - outMessage := &agent.StreamMetricsMessage{ - Identity: a.agentID, - EnvoyMetrics: []*io_prometheus_client.MetricFamily{}, - } + a.aggregatedMetrics[instanceID] = newMetrics + return + } - for key, instanceMetrics := range a.metricStack { - outMessage.EnvoyMetrics = append(outMessage.EnvoyMetrics, instanceMetrics...) - delete(a.metricStack, key) - } + // Otherwise, we reached a new batch of metric, send everything. + outMessage := &agent.StreamMetricsMessage{ + Identity: a.agentID, + EnvoyMetrics: []*io_prometheus_client.MetricFamily{}, + } - if relayedMetricCount := len(outMessage.GetEnvoyMetrics()); relayedMetricCount > 0 { + for key, instanceMetrics := range a.aggregatedMetrics { + outMessage.EnvoyMetrics = append(outMessage.EnvoyMetrics, instanceMetrics...) + delete(a.aggregatedMetrics, key) + } - dlog.Infof(ctx, "Relaying %d metric(s)", relayedMetricCount) + if relayedMetricCount := len(outMessage.GetEnvoyMetrics()); relayedMetricCount > 0 { - if err := a.comm.StreamMetrics(ctx, outMessage, apikey); err != nil { - dlog.Errorf(ctx, "error streaming metric(s): %+v", err) - } + dlog.Infof(ctx, "Relaying %d metric(s)", relayedMetricCount) + + if err := a.comm.StreamMetrics(ctx, outMessage, apikey); err != nil { + dlog.Errorf(ctx, "error streaming metric(s): %+v", err) } + } - // Configure next push. - a.metricsBackoffUntil = time.Now().Add(defaultMinReportPeriod) + // Configure next push. + a.metricsBackoffUntil = time.Now().Add(defaultMinReportPeriod) + + dlog.Infof(ctx, "Next metrics relay scheduled for %s", + a.metricsBackoffUntil.UTC().String()) - dlog.Infof(ctx, "Next metrics relay scheduled for %s", - a.metricsBackoffUntil.UTC().String()) - } } } diff --git a/pkg/agent/agent_metrics_test.go b/pkg/agent/agent_metrics_test.go index 9f8e14a5ff..5ee296626a 100644 --- a/pkg/agent/agent_metrics_test.go +++ b/pkg/agent/agent_metrics_test.go @@ -48,7 +48,7 @@ func agentMetricsSetupTest() (*MockClient, *Agent) { comm: &RPCComm{ client: clientMock, }, - metricStack: map[string][]*io_prometheus_client.MetricFamily{}, + aggregatedMetrics: map[string][]*io_prometheus_client.MetricFamily{}, } return clientMock, stubbedAgent @@ -64,7 +64,7 @@ func TestMetricsRelayHandler(t *testing.T) { IP: net.ParseIP("192.168.0.1"), }, }) - stubbedAgent.metricStack["192.168.0.1"] = []*io_prometheus_client.MetricFamily{acceptedMetric} + stubbedAgent.aggregatedMetrics["192.168.0.1"] = []*io_prometheus_client.MetricFamily{acceptedMetric} //when stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{ @@ -95,7 +95,7 @@ func TestMetricsRelayHandler(t *testing.T) { }) //then - assert.Equal(t, stubbedAgent.metricStack["192.168.0.1"], + assert.Equal(t, stubbedAgent.aggregatedMetrics["192.168.0.1"], []*io_prometheus_client.MetricFamily{acceptedMetric}, "metrics should be added to the stack") assert.Equal(t, 0, len(clientMock.SentMetrics), "nothing send to cloud") From 45c3df1bdac262ed6868daba512b85311c0008cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Lambert?= Date: Tue, 12 Jul 2022 18:36:18 -0400 Subject: [PATCH 3/3] Add tests for edge cases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Kévin Lambert --- pkg/agent/agent.go | 2 +- pkg/agent/agent_metrics_test.go | 34 +++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 61c88adde4..92e70fb365 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -824,7 +824,7 @@ func (a *Agent) MetricsRelayHandler( p, ok := peer.FromContext(ctx) if !ok { - dlog.Errorf(ctx, "peer not found in context") + dlog.Warnf(ctx, "peer not found in context") return } diff --git a/pkg/agent/agent_metrics_test.go b/pkg/agent/agent_metrics_test.go index 5ee296626a..3d460b8be5 100644 --- a/pkg/agent/agent_metrics_test.go +++ b/pkg/agent/agent_metrics_test.go @@ -100,4 +100,38 @@ func TestMetricsRelayHandler(t *testing.T) { "metrics should be added to the stack") assert.Equal(t, 0, len(clientMock.SentMetrics), "nothing send to cloud") }) + t.Run("peer IP is not available", func(t *testing.T) { + // given + clientMock, stubbedAgent := agentMetricsSetupTest() + ctx := dlog.NewTestContext(t, true) + + //when + stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{ + Identifier: nil, + EnvoyMetrics: []*io_prometheus_client.MetricFamily{acceptedMetric}, + }) + + //then + assert.Equal(t, 0, len(stubbedAgent.aggregatedMetrics), "no metrics") + assert.Equal(t, 0, len(clientMock.SentMetrics), "nothing send to cloud") + }) + t.Run("not metrics available in aggregatedMetrics", func(t *testing.T) { + // given + clientMock, stubbedAgent := agentMetricsSetupTest() + ctx := peer.NewContext(dlog.NewTestContext(t, true), &peer.Peer{ + Addr: &net.IPAddr{ + IP: net.ParseIP("192.168.0.1"), + }, + }) + + //when + stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{ + Identifier: nil, + EnvoyMetrics: []*io_prometheus_client.MetricFamily{}, + }) + + //then + assert.Equal(t, 0, len(stubbedAgent.aggregatedMetrics), "no metrics") + assert.Equal(t, 0, len(clientMock.SentMetrics), "nothing send to cloud") + }) }