Skip to content

Commit

Permalink
Add delay in the agent between each metrics push
Browse files Browse the repository at this point in the history
Signed-off-by: Kévin Lambert <kevin.lambert.ca@gmail.com>
  • Loading branch information
knlambert committed Feb 22, 2022
1 parent 39a33c1 commit 548b6ab
Show file tree
Hide file tree
Showing 9 changed files with 545 additions and 5 deletions.
34 changes: 30 additions & 4 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ type Agent struct {

// Field selector for the k8s resources that the agent watches
agentWatchFieldSelector string

// A mutex related to the metrics endpoint action, to avoid concurrent (and useless) pushes.
metricsPushMutex sync.Mutex
// Timestamp to keep in memory to Prevent from making too many requests to the Ambassador
// Cloud API.
nextMetricsPushDeadline time.Time
}

func getEnvWithDefault(envVarKey string, defaultValue string) string {
Expand Down Expand Up @@ -149,6 +155,7 @@ func NewAgent(directiveHandler DirectiveHandler, rolloutsGetterFactory rolloutsG
directiveHandler: directiveHandler,
reportRunning: atomicBool{value: false},
agentWatchFieldSelector: getEnvWithDefault("AGENT_WATCH_FIELD_SELECTOR", "metadata.namespace!=kube-system"),
nextMetricsPushDeadline: time.Now(),
}
}

Expand Down Expand Up @@ -609,15 +616,30 @@ func (a *Agent) ProcessSnapshot(ctx context.Context, snapshot *snapshotTypes.Sna

var allowedMetricsSuffixes = []string{"upstream_rq_total", "upstream_rq_time", "upstream_rq_5xx"}

func (a *Agent) MetricsRelayHandler(logCtx context.Context, in *envoyMetrics.StreamMetricsMessage) {
metrics := in.GetEnvoyMetrics()
dlog.Debugf(logCtx, "received %d metrics", len(metrics))
// MetricsRelayHandler is invoked as a callback when the agent receive metrics from Envoy (sink).
func (a *Agent) MetricsRelayHandler(
logCtx context.Context,
in *envoyMetrics.StreamMetricsMessage,
) {
a.metricsPushMutex.Lock()
defer a.metricsPushMutex.Unlock()

if !time.Now().After(a.nextMetricsPushDeadline) {
dlog.Debugf(logCtx, "pass metrics relay; next push scheduled for %s",
a.nextMetricsPushDeadline.String())
return
}

if a.comm != nil && !a.reportingStopped {
metrics := in.GetEnvoyMetrics()
dlog.Debugf(logCtx, "received %d metrics", len(metrics))

a.ambassadorAPIKeyMutex.Lock()
apikey := a.ambassadorAPIKey
a.ambassadorAPIKeyMutex.Unlock()

outMetrics := make([]*io_prometheus_client.MetricFamily, 0, len(metrics))

for _, metricFamily := range metrics {
for _, suffix := range allowedMetricsSuffixes {
if strings.HasSuffix(metricFamily.GetName(), suffix) {
Expand All @@ -631,10 +653,14 @@ func (a *Agent) MetricsRelayHandler(logCtx context.Context, in *envoyMetrics.Str
Identity: a.agentID,
EnvoyMetrics: outMetrics,
}

dlog.Debugf(logCtx, "relaying %d metrics", len(outMessage.GetEnvoyMetrics()))

if err := a.comm.StreamMetrics(logCtx, outMessage, apikey); err != nil {
dlog.Errorf(logCtx, "Error streaming metrics: %+v", err)
dlog.Errorf(logCtx, "error streaming metrics: %+v", err)
}

a.nextMetricsPushDeadline = time.Now().Add(defaultMinReportPeriod)
}
}

Expand Down
98 changes: 98 additions & 0 deletions pkg/agent/agent_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package agent

import (
"context"
"github.com/datawire/ambassador/v2/pkg/api/agent"
envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v3"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"testing"
"time"
)

var (
counterType = io_prometheus_client.MetricType_COUNTER
acceptedMetric = &io_prometheus_client.MetricFamily{
Name: StrToPointer("cluster.apple_prod_443.upstream_rq_total"),
Type: &counterType,
Metric: []*io_prometheus_client.Metric{
{
Counter: &io_prometheus_client.Counter{
Value: Float64ToPointer(42),
},
TimestampMs: Int64ToPointer(time.Now().Unix() * 1000),
},
},
}
ignoredMetric = &io_prometheus_client.MetricFamily{
Name: StrToPointer("cluster.apple_prod_443.metric_to_ignore"),
Type: &counterType,
Metric: []*io_prometheus_client.Metric{
{
Counter: &io_prometheus_client.Counter{
Value: Float64ToPointer(42),
},
TimestampMs: Int64ToPointer(time.Now().Unix() * 1000),
},
},
}
)

type AgentMetricsSuite struct {
suite.Suite

clientMock *MockClient

stubbedAgent *Agent
}

func (s *AgentMetricsSuite) SetupTest() {
s.clientMock = &MockClient{}

s.stubbedAgent = &Agent{
nextMetricsPushDeadline: time.Time{},
comm: &RPCComm{
client: s.clientMock,
},
}
}

func (s *AgentMetricsSuite) AfterTest(suiteName, testName string) {
return
}

func (s *AgentMetricsSuite) TestMetricsHandlerWithRelay() {
//given
ctx := context.TODO()

//when
s.stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{
Identifier: nil,
EnvoyMetrics: []*io_prometheus_client.MetricFamily{ignoredMetric, acceptedMetric},
})

//then
assert.Equal(s.T(), []*agent.StreamMetricsMessage{{
EnvoyMetrics: []*io_prometheus_client.MetricFamily{acceptedMetric},
}}, s.clientMock.SentMetrics)
}

func (s *AgentMetricsSuite) TestMetricsHandlerWithRelayPass() {
//given
ctx := context.TODO()
s.stubbedAgent.nextMetricsPushDeadline = time.Now().Add(defaultMinReportPeriod)

//when
s.stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{
Identifier: nil,
EnvoyMetrics: []*io_prometheus_client.MetricFamily{acceptedMetric},
})

//then
assert.Equal(s.T(), 0, len(s.clientMock.SentMetrics))
}

func TestSuiteAgentMetrics(t *testing.T) {
suite.Run(t, new(AgentMetricsSuite))
}
28 changes: 27 additions & 1 deletion pkg/agent/comm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type MockClient struct {
Counter int64
grpc.ClientStream
SentMetrics []*agent.StreamMetricsMessage
SentSnapshots []*agent.Snapshot
snapMux sync.Mutex
reportFunc func(context.Context, *agent.Snapshot) (*agent.SnapshotResponse, error)
Expand Down Expand Up @@ -64,9 +65,34 @@ func (m *MockClient) Report(ctx context.Context, in *agent.Snapshot, opts ...grp
}

func (m *MockClient) StreamMetrics(ctx context.Context, opts ...grpc.CallOption) (agent.Director_StreamMetricsClient, error) {
panic("implement me")
return &mockStreamMetricsClient{
ctx: ctx,
opts: opts,
parent: m,
}, nil
}

type mockStreamMetricsClient struct {
ctx context.Context
opts []grpc.CallOption
parent *MockClient
}

func (s *mockStreamMetricsClient) Send(msg *agent.StreamMetricsMessage) error {
s.parent.SentMetrics = append(s.parent.SentMetrics, msg)
return nil
}
func (s *mockStreamMetricsClient) CloseAndRecv() (*agent.StreamMetricsResponse, error) {
return nil, nil
}

func (s *mockStreamMetricsClient) Header() (metadata.MD, error) { return nil, nil }
func (s *mockStreamMetricsClient) Trailer() metadata.MD { return nil }
func (s *mockStreamMetricsClient) CloseSend() error { return nil }
func (s *mockStreamMetricsClient) Context() context.Context { return s.ctx }
func (s *mockStreamMetricsClient) SendMsg(m interface{}) error { return nil }
func (s *mockStreamMetricsClient) RecvMsg(m interface{}) error { return nil }

type mockReportStreamClient struct {
ctx context.Context
opts []grpc.CallOption
Expand Down
16 changes: 16 additions & 0 deletions pkg/agent/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package agent

// StrToPointer will return the pointer to the given string.
func StrToPointer(str string) *string {
return &str
}

// Float64ToPointer will return the pointer to the given float.
func Float64ToPointer(f float64) *float64 {
return &f
}

// Int64ToPointer will return the pointer to the given int64.
func Int64ToPointer(i int64) *int64 {
return &i
}
65 changes: 65 additions & 0 deletions vendor/github.com/stretchr/testify/suite/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions vendor/github.com/stretchr/testify/suite/interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions vendor/github.com/stretchr/testify/suite/stats.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 548b6ab

Please sign in to comment.