Skip to content

Commit

Permalink
Merge pull request #4122 from emissary-ingress/klambert/add-metrics-s…
Browse files Browse the repository at this point in the history
…tream-delay

Add delay in the agent between each metric push
  • Loading branch information
Alice Wasko authored Apr 27, 2022
2 parents c846a42 + 842aaf1 commit 9098031
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ Please see the [Envoy documentation](https://www.envoyproxy.io/docs/envoy/latest
- Bugfix: Kubernetes Secrets that should contain an EC (Elliptic Curve) TLS Private Key are now
properly validated. ([4134])

- Change: The new delay between two metrics syncs is now 30s. ([#4122])

[4134]: https://github.com/emissary-ingress/emissary/issues/4134
[#4122]: https://github.com/emissary-ingress/emissary/pull/4122

## [2.2.1] February 22, 2022
[2.2.1]: https://github.com/emissary-ingress/emissary/compare/v2.2.0...v2.2.1
Expand Down
8 changes: 8 additions & 0 deletions docs/releaseNotes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ items:
link: https://github.com/emissary-ingress/emissary/issues/4134
docs: https://github.com/emissary-ingress/emissary/issues/4134

- title: Decrease metric sync frequency
type: change
body: >-
The new delay between two metrics syncs is now 30s.
github:
- title: "#4122"
link: https://github.com/emissary-ingress/emissary/pull/4122

- version: 2.2.1
date: '2022-02-22'
notes:
Expand Down
64 changes: 58 additions & 6 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ type Agent struct {

// Field selector for the k8s resources that the agent watches
agentWatchFieldSelector string

// A mutex related to the metrics endpoint action, to avoid concurrent (and useless) pushes.
metricsRelayMutex sync.Mutex
// Timestamp to keep in memory to Prevent from making too many requests to the Ambassador
// Cloud API.
metricsBackoffUntil time.Time

// Extra headers to inject into RPC requests to ambassador cloud.
rpcExtraHeaders []string
}

func getEnvWithDefault(envVarKey string, defaultValue string) string {
Expand Down Expand Up @@ -135,6 +144,17 @@ func NewAgent(directiveHandler DirectiveHandler, rolloutsGetterFactory rolloutsG
}
}

var rpcExtraHeaders = make([]string, 0)

if os.Getenv("RPC_INTERCEPT_HEADER_KEY") != "" &&
os.Getenv("RPC_INTERCEPT_HEADER_VALUE") != "" {
rpcExtraHeaders = append(
rpcExtraHeaders,
os.Getenv("RPC_INTERCEPT_HEADER_KEY"),
os.Getenv("RPC_INTERCEPT_HEADER_VALUE"),
)
}

return &Agent{
minReportPeriod: reportPeriod,
reportComplete: make(chan error),
Expand All @@ -149,6 +169,8 @@ func NewAgent(directiveHandler DirectiveHandler, rolloutsGetterFactory rolloutsG
directiveHandler: directiveHandler,
reportRunning: atomicBool{value: false},
agentWatchFieldSelector: getEnvWithDefault("AGENT_WATCH_FIELD_SELECTOR", "metadata.namespace!=kube-system"),
metricsBackoffUntil: time.Now(),
rpcExtraHeaders: rpcExtraHeaders,
}
}

Expand Down Expand Up @@ -469,7 +491,9 @@ func (a *Agent) MaybeReport(ctx context.Context) {
// The communications channel to the DCP was not yet created or was
// closed above, due to a change in identity, or close elsewhere, due to
// a change in endpoint configuration.
newComm, err := NewComm(ctx, a.connInfo, a.agentID, a.ambassadorAPIKey)
newComm, err := NewComm(
ctx, a.connInfo, a.agentID, a.ambassadorAPIKey, a.rpcExtraHeaders)

if err != nil {
dlog.Warnf(ctx, "Failed to dial the DCP: %v", err)
dlog.Warn(ctx, "DCP functionality disabled until next retry")
Expand Down Expand Up @@ -609,15 +633,33 @@ func (a *Agent) ProcessSnapshot(ctx context.Context, snapshot *snapshotTypes.Sna

var allowedMetricsSuffixes = []string{"upstream_rq_total", "upstream_rq_time", "upstream_rq_5xx"}

func (a *Agent) MetricsRelayHandler(logCtx context.Context, in *envoyMetrics.StreamMetricsMessage) {
// MetricsRelayHandler is invoked as a callback when the agent receive metrics from Envoy (sink).
func (a *Agent) MetricsRelayHandler(
logCtx context.Context,
in *envoyMetrics.StreamMetricsMessage,
) {
a.metricsRelayMutex.Lock()
defer a.metricsRelayMutex.Unlock()

metrics := in.GetEnvoyMetrics()
dlog.Debugf(logCtx, "received %d metrics", len(metrics))
metricCount := len(metrics)

if !time.Now().After(a.metricsBackoffUntil) {
dlog.Debugf(logCtx, "Drop %d metric(s); next push scheduled for %s",
metricCount, a.metricsBackoffUntil.String())
return
}

if a.comm != nil && !a.reportingStopped {

dlog.Infof(logCtx, "Received %d metric(s)", metricCount)

a.ambassadorAPIKeyMutex.Lock()
apikey := a.ambassadorAPIKey
a.ambassadorAPIKeyMutex.Unlock()

outMetrics := make([]*io_prometheus_client.MetricFamily, 0, len(metrics))

for _, metricFamily := range metrics {
for _, suffix := range allowedMetricsSuffixes {
if strings.HasSuffix(metricFamily.GetName(), suffix) {
Expand All @@ -631,9 +673,19 @@ func (a *Agent) MetricsRelayHandler(logCtx context.Context, in *envoyMetrics.Str
Identity: a.agentID,
EnvoyMetrics: outMetrics,
}
dlog.Debugf(logCtx, "relaying %d metrics", len(outMessage.GetEnvoyMetrics()))
if err := a.comm.StreamMetrics(logCtx, outMessage, apikey); err != nil {
dlog.Errorf(logCtx, "Error streaming metrics: %+v", err)

if relayedMetricCount := len(outMessage.GetEnvoyMetrics()); relayedMetricCount > 0 {

dlog.Infof(logCtx, "Relaying %d metric(s)", relayedMetricCount)

if err := a.comm.StreamMetrics(logCtx, outMessage, apikey); err != nil {
dlog.Errorf(logCtx, "error streaming metric(s): %+v", err)
}

a.metricsBackoffUntil = time.Now().Add(defaultMinReportPeriod)

dlog.Infof(logCtx, "Next metrics relay scheduled for %s",
a.metricsBackoffUntil.UTC().String())
}
}
}
Expand Down
87 changes: 87 additions & 0 deletions pkg/agent/agent_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package agent

import (
"github.com/datawire/ambassador/v2/pkg/api/agent"
envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v3"
"github.com/datawire/dlib/dlog"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

var (
counterType = io_prometheus_client.MetricType_COUNTER
acceptedMetric = &io_prometheus_client.MetricFamily{
Name: StrToPointer("cluster.apple_prod_443.upstream_rq_total"),
Type: &counterType,
Metric: []*io_prometheus_client.Metric{
{
Counter: &io_prometheus_client.Counter{
Value: Float64ToPointer(42),
},
TimestampMs: Int64ToPointer(time.Now().Unix() * 1000),
},
},
}
ignoredMetric = &io_prometheus_client.MetricFamily{
Name: StrToPointer("cluster.apple_prod_443.metric_to_ignore"),
Type: &counterType,
Metric: []*io_prometheus_client.Metric{
{
Counter: &io_prometheus_client.Counter{
Value: Float64ToPointer(42),
},
TimestampMs: Int64ToPointer(time.Now().Unix() * 1000),
},
},
}
)

func agentMetricsSetupTest() (*MockClient, *Agent) {
clientMock := &MockClient{}

stubbedAgent := &Agent{
metricsBackoffUntil: time.Time{},
comm: &RPCComm{
client: clientMock,
},
}

return clientMock, stubbedAgent
}

func TestMetricsRelayHandler(t *testing.T) {

t.Run("will relay the metrics", func(t *testing.T) {
//given
clientMock, stubbedAgent := agentMetricsSetupTest()
ctx := dlog.NewTestContext(t, true)

//when
stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{
Identifier: nil,
EnvoyMetrics: []*io_prometheus_client.MetricFamily{ignoredMetric, acceptedMetric},
})

//then
assert.Equal(t, []*agent.StreamMetricsMessage{{
EnvoyMetrics: []*io_prometheus_client.MetricFamily{acceptedMetric},
}}, clientMock.SentMetrics)
})
t.Run("will not relay the metrics since it is in cool down period.", func(t *testing.T) {
//given
clientMock, stubbedAgent := agentMetricsSetupTest()
ctx := dlog.NewTestContext(t, true)
stubbedAgent.metricsBackoffUntil = time.Now().Add(defaultMinReportPeriod)

//when
stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{
Identifier: nil,
EnvoyMetrics: []*io_prometheus_client.MetricFamily{acceptedMetric},
})

//then
assert.Equal(t, 0, len(clientMock.SentMetrics))
})
}
36 changes: 25 additions & 11 deletions pkg/agent/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type RPCComm struct {
agentID *agent.Identity
directives chan *agent.Directive
metricsStreamWriterMutex sync.Mutex
extraHeaders []string
}

const (
Expand Down Expand Up @@ -64,7 +65,13 @@ func connInfoFromAddress(address string) (*ConnInfo, error) {
return &ConnInfo{hostname, port, secure}, nil
}

func NewComm(ctx context.Context, connInfo *ConnInfo, agentID *agent.Identity, apiKey string) (*RPCComm, error) {
func NewComm(
ctx context.Context,
connInfo *ConnInfo,
agentID *agent.Identity,
apiKey string,
extraHeaders []string,
) (*RPCComm, error) {
ctx = dlog.WithField(ctx, "agent", "comm")
opts := make([]grpc.DialOption, 0, 1)
address := connInfo.hostname + ":" + connInfo.port
Expand All @@ -88,20 +95,26 @@ func NewComm(ctx context.Context, connInfo *ConnInfo, agentID *agent.Identity, a
retCtx, retCancel := context.WithCancel(ctx)

c := &RPCComm{
conn: conn,
client: client,
retCancel: retCancel,
agentID: agentID,
directives: make(chan *agent.Directive, 1),
rptWake: make(chan struct{}, 1),
conn: conn,
client: client,
retCancel: retCancel,
agentID: agentID,
directives: make(chan *agent.Directive, 1),
rptWake: make(chan struct{}, 1),
extraHeaders: extraHeaders,
}
retCtx = metadata.AppendToOutgoingContext(retCtx, APIKeyMetadataKey, apiKey)
retCtx = metadata.AppendToOutgoingContext(ctx, c.getHeaders(apiKey)...)

go c.retrieveLoop(retCtx)

return c, nil
}

func (c *RPCComm) getHeaders(apiKey string) []string {
return append([]string{
APIKeyMetadataKey, apiKey}, c.extraHeaders...)
}

func (c *RPCComm) retrieveLoop(ctx context.Context) {
ctx = dlog.WithField(ctx, "agent", "retriever")

Expand Down Expand Up @@ -146,7 +159,7 @@ func (c *RPCComm) Close() error {
}

func (c *RPCComm) ReportCommandResult(ctx context.Context, result *agent.CommandResult, apiKey string) error {
ctx = metadata.AppendToOutgoingContext(ctx, APIKeyMetadataKey, apiKey)
ctx = metadata.AppendToOutgoingContext(ctx, c.getHeaders(apiKey)...)
_, err := c.client.ReportCommandResult(ctx, result, grpc.EmptyCallOption{})
if err != nil {
return fmt.Errorf("ReportCommandResult error: %w", err)
Expand All @@ -159,7 +172,7 @@ func (c *RPCComm) Report(ctx context.Context, report *agent.Snapshot, apiKey str
case c.rptWake <- struct{}{}:
default:
}
ctx = metadata.AppendToOutgoingContext(ctx, APIKeyMetadataKey, apiKey)
ctx = metadata.AppendToOutgoingContext(ctx, c.getHeaders(apiKey)...)

// marshal snapshot
data, err := json.Marshal(report)
Expand Down Expand Up @@ -206,8 +219,9 @@ func (c *RPCComm) StreamMetrics(ctx context.Context, metrics *agent.StreamMetric

c.metricsStreamWriterMutex.Lock()
defer c.metricsStreamWriterMutex.Unlock()
ctx = metadata.AppendToOutgoingContext(ctx, APIKeyMetadataKey, apiKey)
ctx = metadata.AppendToOutgoingContext(ctx, c.getHeaders(apiKey)...)
streamClient, err := c.client.StreamMetrics(ctx)

if err != nil {
return err
}
Expand Down
28 changes: 27 additions & 1 deletion pkg/agent/comm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type MockClient struct {
Counter int64
grpc.ClientStream
SentMetrics []*agent.StreamMetricsMessage
SentSnapshots []*agent.Snapshot
snapMux sync.Mutex
reportFunc func(context.Context, *agent.Snapshot) (*agent.SnapshotResponse, error)
Expand Down Expand Up @@ -64,9 +65,34 @@ func (m *MockClient) Report(ctx context.Context, in *agent.Snapshot, opts ...grp
}

func (m *MockClient) StreamMetrics(ctx context.Context, opts ...grpc.CallOption) (agent.Director_StreamMetricsClient, error) {
panic("implement me")
return &mockStreamMetricsClient{
ctx: ctx,
opts: opts,
parent: m,
}, nil
}

type mockStreamMetricsClient struct {
ctx context.Context
opts []grpc.CallOption
parent *MockClient
}

func (s *mockStreamMetricsClient) Send(msg *agent.StreamMetricsMessage) error {
s.parent.SentMetrics = append(s.parent.SentMetrics, msg)
return nil
}
func (s *mockStreamMetricsClient) CloseAndRecv() (*agent.StreamMetricsResponse, error) {
return nil, nil
}

func (s *mockStreamMetricsClient) Header() (metadata.MD, error) { return nil, nil }
func (s *mockStreamMetricsClient) Trailer() metadata.MD { return nil }
func (s *mockStreamMetricsClient) CloseSend() error { return nil }
func (s *mockStreamMetricsClient) Context() context.Context { return s.ctx }
func (s *mockStreamMetricsClient) SendMsg(m interface{}) error { return nil }
func (s *mockStreamMetricsClient) RecvMsg(m interface{}) error { return nil }

type mockReportStreamClient struct {
ctx context.Context
opts []grpc.CallOption
Expand Down
16 changes: 16 additions & 0 deletions pkg/agent/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package agent

// StrToPointer will return the pointer to the given string.
func StrToPointer(str string) *string {
return &str
}

// Float64ToPointer will return the pointer to the given float.
func Float64ToPointer(f float64) *float64 {
return &f
}

// Int64ToPointer will return the pointer to the given int64.
func Int64ToPointer(i int64) *int64 {
return &i
}

0 comments on commit 9098031

Please sign in to comment.