Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics sync for 2.3 #4207

Merged
merged 6 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ Please see the [Envoy documentation](https://www.envoyproxy.io/docs/envoy/latest
- Bugfix: Kubernetes Secrets that should contain an EC (Elliptic Curve) TLS Private Key are now
properly validated. ([4134])

- Change: The new delay between two metrics syncs is now 30s. ([#4122])

[4134]: https://github.com/emissary-ingress/emissary/issues/4134
[#4122]: https://github.com/emissary-ingress/emissary/pull/4122

## [2.2.1] February 22, 2022
[2.2.1]: https://github.com/emissary-ingress/emissary/compare/v2.2.0...v2.2.1
Expand Down
8 changes: 8 additions & 0 deletions docs/releaseNotes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ items:
link: https://github.com/emissary-ingress/emissary/issues/4134
docs: https://github.com/emissary-ingress/emissary/issues/4134

- title: Decrease metric sync frequency
type: change
body: >-
The new delay between two metrics syncs is now 30s.
github:
- title: "#4122"
link: https://github.com/emissary-ingress/emissary/pull/4122

- version: 2.2.1
date: '2022-02-22'
notes:
Expand Down
64 changes: 58 additions & 6 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ type Agent struct {

// Field selector for the k8s resources that the agent watches
agentWatchFieldSelector string

// A mutex related to the metrics endpoint action, to avoid concurrent (and useless) pushes.
metricsRelayMutex sync.Mutex
// Timestamp to keep in memory to Prevent from making too many requests to the Ambassador
// Cloud API.
metricsBackoffUntil time.Time

// Extra headers to inject into RPC requests to ambassador cloud.
rpcExtraHeaders []string
}

func getEnvWithDefault(envVarKey string, defaultValue string) string {
Expand Down Expand Up @@ -135,6 +144,17 @@ func NewAgent(directiveHandler DirectiveHandler, rolloutsGetterFactory rolloutsG
}
}

var rpcExtraHeaders = make([]string, 0)

if os.Getenv("RPC_INTERCEPT_HEADER_KEY") != "" &&
os.Getenv("RPC_INTERCEPT_HEADER_VALUE") != "" {
rpcExtraHeaders = append(
rpcExtraHeaders,
os.Getenv("RPC_INTERCEPT_HEADER_KEY"),
os.Getenv("RPC_INTERCEPT_HEADER_VALUE"),
)
}

return &Agent{
minReportPeriod: reportPeriod,
reportComplete: make(chan error),
Expand All @@ -149,6 +169,8 @@ func NewAgent(directiveHandler DirectiveHandler, rolloutsGetterFactory rolloutsG
directiveHandler: directiveHandler,
reportRunning: atomicBool{value: false},
agentWatchFieldSelector: getEnvWithDefault("AGENT_WATCH_FIELD_SELECTOR", "metadata.namespace!=kube-system"),
metricsBackoffUntil: time.Now(),
rpcExtraHeaders: rpcExtraHeaders,
}
}

Expand Down Expand Up @@ -469,7 +491,9 @@ func (a *Agent) MaybeReport(ctx context.Context) {
// The communications channel to the DCP was not yet created or was
// closed above, due to a change in identity, or close elsewhere, due to
// a change in endpoint configuration.
newComm, err := NewComm(ctx, a.connInfo, a.agentID, a.ambassadorAPIKey)
newComm, err := NewComm(
ctx, a.connInfo, a.agentID, a.ambassadorAPIKey, a.rpcExtraHeaders)

if err != nil {
dlog.Warnf(ctx, "Failed to dial the DCP: %v", err)
dlog.Warn(ctx, "DCP functionality disabled until next retry")
Expand Down Expand Up @@ -609,15 +633,33 @@ func (a *Agent) ProcessSnapshot(ctx context.Context, snapshot *snapshotTypes.Sna

var allowedMetricsSuffixes = []string{"upstream_rq_total", "upstream_rq_time", "upstream_rq_5xx"}

func (a *Agent) MetricsRelayHandler(logCtx context.Context, in *envoyMetrics.StreamMetricsMessage) {
// MetricsRelayHandler is invoked as a callback when the agent receive metrics from Envoy (sink).
func (a *Agent) MetricsRelayHandler(
logCtx context.Context,
in *envoyMetrics.StreamMetricsMessage,
) {
a.metricsRelayMutex.Lock()
defer a.metricsRelayMutex.Unlock()

metrics := in.GetEnvoyMetrics()
dlog.Debugf(logCtx, "received %d metrics", len(metrics))
metricCount := len(metrics)

if !time.Now().After(a.metricsBackoffUntil) {
dlog.Debugf(logCtx, "Drop %d metric(s); next push scheduled for %s",
metricCount, a.metricsBackoffUntil.String())
return
}

if a.comm != nil && !a.reportingStopped {

dlog.Infof(logCtx, "Received %d metric(s)", metricCount)

a.ambassadorAPIKeyMutex.Lock()
apikey := a.ambassadorAPIKey
a.ambassadorAPIKeyMutex.Unlock()

outMetrics := make([]*io_prometheus_client.MetricFamily, 0, len(metrics))

for _, metricFamily := range metrics {
for _, suffix := range allowedMetricsSuffixes {
if strings.HasSuffix(metricFamily.GetName(), suffix) {
Expand All @@ -631,9 +673,19 @@ func (a *Agent) MetricsRelayHandler(logCtx context.Context, in *envoyMetrics.Str
Identity: a.agentID,
EnvoyMetrics: outMetrics,
}
dlog.Debugf(logCtx, "relaying %d metrics", len(outMessage.GetEnvoyMetrics()))
if err := a.comm.StreamMetrics(logCtx, outMessage, apikey); err != nil {
dlog.Errorf(logCtx, "Error streaming metrics: %+v", err)

if relayedMetricCount := len(outMessage.GetEnvoyMetrics()); relayedMetricCount > 0 {

dlog.Infof(logCtx, "Relaying %d metric(s)", relayedMetricCount)

if err := a.comm.StreamMetrics(logCtx, outMessage, apikey); err != nil {
dlog.Errorf(logCtx, "error streaming metric(s): %+v", err)
}

a.metricsBackoffUntil = time.Now().Add(defaultMinReportPeriod)

dlog.Infof(logCtx, "Next metrics relay scheduled for %s",
a.metricsBackoffUntil.UTC().String())
}
}
}
Expand Down
87 changes: 87 additions & 0 deletions pkg/agent/agent_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package agent

import (
"github.com/datawire/ambassador/v2/pkg/api/agent"
envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v3"
"github.com/datawire/dlib/dlog"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

var (
counterType = io_prometheus_client.MetricType_COUNTER
acceptedMetric = &io_prometheus_client.MetricFamily{
Name: StrToPointer("cluster.apple_prod_443.upstream_rq_total"),
Type: &counterType,
Metric: []*io_prometheus_client.Metric{
{
Counter: &io_prometheus_client.Counter{
Value: Float64ToPointer(42),
},
TimestampMs: Int64ToPointer(time.Now().Unix() * 1000),
},
},
}
ignoredMetric = &io_prometheus_client.MetricFamily{
Name: StrToPointer("cluster.apple_prod_443.metric_to_ignore"),
Type: &counterType,
Metric: []*io_prometheus_client.Metric{
{
Counter: &io_prometheus_client.Counter{
Value: Float64ToPointer(42),
},
TimestampMs: Int64ToPointer(time.Now().Unix() * 1000),
},
},
}
)

func agentMetricsSetupTest() (*MockClient, *Agent) {
clientMock := &MockClient{}

stubbedAgent := &Agent{
metricsBackoffUntil: time.Time{},
comm: &RPCComm{
client: clientMock,
},
}

return clientMock, stubbedAgent
}

func TestMetricsRelayHandler(t *testing.T) {

t.Run("will relay the metrics", func(t *testing.T) {
//given
clientMock, stubbedAgent := agentMetricsSetupTest()
ctx := dlog.NewTestContext(t, true)

//when
stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{
Identifier: nil,
EnvoyMetrics: []*io_prometheus_client.MetricFamily{ignoredMetric, acceptedMetric},
})

//then
assert.Equal(t, []*agent.StreamMetricsMessage{{
EnvoyMetrics: []*io_prometheus_client.MetricFamily{acceptedMetric},
}}, clientMock.SentMetrics)
})
t.Run("will not relay the metrics since it is in cool down period.", func(t *testing.T) {
//given
clientMock, stubbedAgent := agentMetricsSetupTest()
ctx := dlog.NewTestContext(t, true)
stubbedAgent.metricsBackoffUntil = time.Now().Add(defaultMinReportPeriod)

//when
stubbedAgent.MetricsRelayHandler(ctx, &envoyMetrics.StreamMetricsMessage{
Identifier: nil,
EnvoyMetrics: []*io_prometheus_client.MetricFamily{acceptedMetric},
})

//then
assert.Equal(t, 0, len(clientMock.SentMetrics))
})
}
36 changes: 25 additions & 11 deletions pkg/agent/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type RPCComm struct {
agentID *agent.Identity
directives chan *agent.Directive
metricsStreamWriterMutex sync.Mutex
extraHeaders []string
}

const (
Expand Down Expand Up @@ -64,7 +65,13 @@ func connInfoFromAddress(address string) (*ConnInfo, error) {
return &ConnInfo{hostname, port, secure}, nil
}

func NewComm(ctx context.Context, connInfo *ConnInfo, agentID *agent.Identity, apiKey string) (*RPCComm, error) {
func NewComm(
ctx context.Context,
connInfo *ConnInfo,
agentID *agent.Identity,
apiKey string,
extraHeaders []string,
) (*RPCComm, error) {
ctx = dlog.WithField(ctx, "agent", "comm")
opts := make([]grpc.DialOption, 0, 1)
address := connInfo.hostname + ":" + connInfo.port
Expand All @@ -88,20 +95,26 @@ func NewComm(ctx context.Context, connInfo *ConnInfo, agentID *agent.Identity, a
retCtx, retCancel := context.WithCancel(ctx)

c := &RPCComm{
conn: conn,
client: client,
retCancel: retCancel,
agentID: agentID,
directives: make(chan *agent.Directive, 1),
rptWake: make(chan struct{}, 1),
conn: conn,
client: client,
retCancel: retCancel,
agentID: agentID,
directives: make(chan *agent.Directive, 1),
rptWake: make(chan struct{}, 1),
extraHeaders: extraHeaders,
}
retCtx = metadata.AppendToOutgoingContext(retCtx, APIKeyMetadataKey, apiKey)
retCtx = metadata.AppendToOutgoingContext(ctx, c.getHeaders(apiKey)...)

go c.retrieveLoop(retCtx)

return c, nil
}

func (c *RPCComm) getHeaders(apiKey string) []string {
return append([]string{
APIKeyMetadataKey, apiKey}, c.extraHeaders...)
}

func (c *RPCComm) retrieveLoop(ctx context.Context) {
ctx = dlog.WithField(ctx, "agent", "retriever")

Expand Down Expand Up @@ -146,7 +159,7 @@ func (c *RPCComm) Close() error {
}

func (c *RPCComm) ReportCommandResult(ctx context.Context, result *agent.CommandResult, apiKey string) error {
ctx = metadata.AppendToOutgoingContext(ctx, APIKeyMetadataKey, apiKey)
ctx = metadata.AppendToOutgoingContext(ctx, c.getHeaders(apiKey)...)
_, err := c.client.ReportCommandResult(ctx, result, grpc.EmptyCallOption{})
if err != nil {
return fmt.Errorf("ReportCommandResult error: %w", err)
Expand All @@ -159,7 +172,7 @@ func (c *RPCComm) Report(ctx context.Context, report *agent.Snapshot, apiKey str
case c.rptWake <- struct{}{}:
default:
}
ctx = metadata.AppendToOutgoingContext(ctx, APIKeyMetadataKey, apiKey)
ctx = metadata.AppendToOutgoingContext(ctx, c.getHeaders(apiKey)...)

// marshal snapshot
data, err := json.Marshal(report)
Expand Down Expand Up @@ -206,8 +219,9 @@ func (c *RPCComm) StreamMetrics(ctx context.Context, metrics *agent.StreamMetric

c.metricsStreamWriterMutex.Lock()
defer c.metricsStreamWriterMutex.Unlock()
ctx = metadata.AppendToOutgoingContext(ctx, APIKeyMetadataKey, apiKey)
ctx = metadata.AppendToOutgoingContext(ctx, c.getHeaders(apiKey)...)
streamClient, err := c.client.StreamMetrics(ctx)

if err != nil {
return err
}
Expand Down
28 changes: 27 additions & 1 deletion pkg/agent/comm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type MockClient struct {
Counter int64
grpc.ClientStream
SentMetrics []*agent.StreamMetricsMessage
SentSnapshots []*agent.Snapshot
snapMux sync.Mutex
reportFunc func(context.Context, *agent.Snapshot) (*agent.SnapshotResponse, error)
Expand Down Expand Up @@ -64,9 +65,34 @@ func (m *MockClient) Report(ctx context.Context, in *agent.Snapshot, opts ...grp
}

func (m *MockClient) StreamMetrics(ctx context.Context, opts ...grpc.CallOption) (agent.Director_StreamMetricsClient, error) {
panic("implement me")
return &mockStreamMetricsClient{
ctx: ctx,
opts: opts,
parent: m,
}, nil
}

type mockStreamMetricsClient struct {
ctx context.Context
opts []grpc.CallOption
parent *MockClient
}

func (s *mockStreamMetricsClient) Send(msg *agent.StreamMetricsMessage) error {
s.parent.SentMetrics = append(s.parent.SentMetrics, msg)
return nil
}
func (s *mockStreamMetricsClient) CloseAndRecv() (*agent.StreamMetricsResponse, error) {
return nil, nil
}

func (s *mockStreamMetricsClient) Header() (metadata.MD, error) { return nil, nil }
func (s *mockStreamMetricsClient) Trailer() metadata.MD { return nil }
func (s *mockStreamMetricsClient) CloseSend() error { return nil }
func (s *mockStreamMetricsClient) Context() context.Context { return s.ctx }
func (s *mockStreamMetricsClient) SendMsg(m interface{}) error { return nil }
func (s *mockStreamMetricsClient) RecvMsg(m interface{}) error { return nil }

type mockReportStreamClient struct {
ctx context.Context
opts []grpc.CallOption
Expand Down
16 changes: 16 additions & 0 deletions pkg/agent/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package agent

// StrToPointer will return the pointer to the given string.
func StrToPointer(str string) *string {
return &str
}

// Float64ToPointer will return the pointer to the given float.
func Float64ToPointer(f float64) *float64 {
return &f
}

// Int64ToPointer will return the pointer to the given int64.
func Int64ToPointer(i int64) *int64 {
return &i
}
2 changes: 1 addition & 1 deletion python/ambassador/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def parse_bool(s: Optional[Union[str, bool]]) -> bool:

# OK, we got _something_, so try strtobool.
try:
return strtobool(s)
return bool(strtobool(s)) # the linter does not like a Literal[0, 1] being returned here
except ValueError:
return False

Expand Down