Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of Allow HCP metrics collection for Envoy proxies into release/1.15.x #16611

Merged
merged 1 commit into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16585.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
xds: Allow for configuring connect proxies to send service mesh telemetry to an HCP metrics collection service.
```
71 changes: 71 additions & 0 deletions agent/proxycfg/connect_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package proxycfg
import (
"context"
"fmt"
"path"
"strings"

"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/mitchellh/mapstructure"
)

type handlerConnectProxy struct {
Expand Down Expand Up @@ -103,6 +107,10 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
return snap, err
}

if err := s.maybeInitializeHCPMetricsWatches(ctx, snap); err != nil {
return snap, fmt.Errorf("failed to initialize HCP metrics watches: %w", err)
}

if s.proxyCfg.Mode == structs.ProxyModeTransparent {
// When in transparent proxy we will infer upstreams from intentions with this source
err := s.dataSources.IntentionUpstreams.Notify(ctx, &structs.ServiceSpecificRequest{
Expand Down Expand Up @@ -614,3 +622,66 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
}
return nil
}

// hcpMetricsConfig represents the basic opaque config values for pushing telemetry to HCP.
type hcpMetricsConfig struct {
// HCPMetricsBindSocketDir is a string that configures the directory for a
// unix socket where Envoy will forward metrics. These metrics get pushed to
// the HCP Metrics collector to show service mesh metrics on HCP.
HCPMetricsBindSocketDir string `mapstructure:"envoy_hcp_metrics_bind_socket_dir"`
}

func parseHCPMetricsConfig(m map[string]interface{}) (hcpMetricsConfig, error) {
var cfg hcpMetricsConfig
err := mapstructure.WeakDecode(m, &cfg)

if err != nil {
return cfg, fmt.Errorf("failed to decode: %w", err)
}

return cfg, nil
}

// maybeInitializeHCPMetricsWatches will initialize a synthetic upstream and discovery chain
// watch for the HCP metrics collector, if metrics collection is enabled on the proxy registration.
func (s *handlerConnectProxy) maybeInitializeHCPMetricsWatches(ctx context.Context, snap ConfigSnapshot) error {
hcpCfg, err := parseHCPMetricsConfig(s.proxyCfg.Config)
if err != nil {
s.logger.Error("failed to parse connect.proxy.config", "error", err)
}

if hcpCfg.HCPMetricsBindSocketDir == "" {
// Metrics collection is not enabled, return early.
return nil
}

// The path includes the proxy ID so that when multiple proxies are on the same host
// they each have a distinct path to send their metrics.
sock := fmt.Sprintf("%s_%s.sock", s.proxyID.NamespaceOrDefault(), s.proxyID.ID)
path := path.Join(hcpCfg.HCPMetricsBindSocketDir, sock)

upstream := structs.Upstream{
DestinationNamespace: acl.DefaultNamespaceName,
DestinationPartition: s.proxyID.PartitionOrDefault(),
DestinationName: api.HCPMetricsCollectorName,
LocalBindSocketPath: path,
Config: map[string]interface{}{
"protocol": "grpc",
},
}
uid := NewUpstreamID(&upstream)
snap.ConnectProxy.UpstreamConfig[uid] = &upstream

err = s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: upstream.DestinationName,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInNamespace: uid.NamespaceOrDefault(),
EvaluateInPartition: uid.PartitionOrDefault(),
}, "discovery-chain:"+uid.String(), s.ch)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", uid.String(), err)
}
return nil
}
181 changes: 171 additions & 10 deletions agent/proxycfg/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
apimod "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/sdk/testutil"
Expand Down Expand Up @@ -455,16 +456,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {

// Used to account for differences in OSS/ent implementations of ServiceID.String()
var (
db = structs.NewServiceName("db", nil)
billing = structs.NewServiceName("billing", nil)
api = structs.NewServiceName("api", nil)
apiA = structs.NewServiceName("api-a", nil)

apiUID = NewUpstreamIDFromServiceName(api)
dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query")
extApiUID = NewUpstreamIDFromServiceName(apiA)
extDBUID = NewUpstreamIDFromServiceName(db)
db = structs.NewServiceName("db", nil)
billing = structs.NewServiceName("billing", nil)
api = structs.NewServiceName("api", nil)
apiA = structs.NewServiceName("api-a", nil)
hcpCollector = structs.NewServiceName(apimod.HCPMetricsCollectorName, nil)

apiUID = NewUpstreamIDFromServiceName(api)
dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query")
extApiUID = NewUpstreamIDFromServiceName(apiA)
extDBUID = NewUpstreamIDFromServiceName(db)
hcpCollectorUID = NewUpstreamIDFromServiceName(hcpCollector)
)
// TODO(peering): NewUpstreamIDFromServiceName should take a PeerName
extApiUID.Peer = "peer-a"
Expand Down Expand Up @@ -3623,6 +3626,164 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
"hcp-metrics": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Address: "10.0.1.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Config: map[string]interface{}{
"envoy_hcp_metrics_bind_socket_dir": "/tmp/consul/hcp-metrics/",
},
},
},
sourceDC: "dc1",
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: hcpCollector.Name,
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
EvaluateInPartition: "default",
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{
Token: aclToken,
},
}),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "should not be valid")

require.Len(t, snap.ConnectProxy.DiscoveryChain, 0, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.WatchedDiscoveryChains, 0, "%+v", snap.ConnectProxy.WatchedDiscoveryChains)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 0, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 0, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
},
},
{
events: []UpdateEvent{
rootWatchEvent(),
{
CorrelationID: peeringTrustBundlesWatchID,
Result: peerTrustBundles,
},
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{},
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, hcpCollector.Name, "default", "default", "dc1", "trustdomain.consul", nil),
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)

// An event was received with the HCP collector's discovery chain, which sets up some bookkeeping in the snapshot.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, hcpCollectorUID)

require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, hcpCollectorUID)

expectUpstream := structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: apimod.HCPMetricsCollectorName,
LocalBindSocketPath: "/tmp/consul/hcp-metrics/default_web-sidecar-proxy.sock",
Config: map[string]interface{}{
"protocol": "grpc",
},
}
uid := NewUpstreamID(&expectUpstream)

require.Contains(t, snap.ConnectProxy.UpstreamConfig, uid)
require.Equal(t, &expectUpstream, snap.ConnectProxy.UpstreamConfig[uid])

// No endpoints have arrived yet.
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID], 0, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
},
},
{
requiredWatches: map[string]verifyWatchRequest{
fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.HCPMetricsCollectorName) + hcpCollectorUID.String(): genVerifyServiceSpecificRequest(apimod.HCPMetricsCollectorName, "", "dc1", true),
},
events: []UpdateEvent{
{
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.HCPMetricsCollectorName) + hcpCollectorUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "10.0.0.1",
},
Service: &structs.NodeService{
ID: apimod.HCPMetricsCollectorName,
Service: apimod.HCPMetricsCollectorName,
Port: 8080,
},
},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)

// Discovery chain for the HCP collector should still be stored in the snapshot.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, hcpCollectorUID)

require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, hcpCollectorUID)

// An endpoint arrived for the HCP collector, so it should be present in the snapshot.
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID], 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)

nodes := structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "10.0.0.1",
},
Service: &structs.NodeService{
ID: apimod.HCPMetricsCollectorName,
Service: apimod.HCPMetricsCollectorName,
Port: 8080,
},
},
}
target := fmt.Sprintf("%s.default.default.dc1", apimod.HCPMetricsCollectorName)
require.Equal(t, nodes, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID][target])
},
},
},
},
}

for name, tc := range cases {
Expand Down
50 changes: 50 additions & 0 deletions agent/proxycfg/testing_connect_proxy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxycfg

import (
"fmt"
"time"

"github.com/mitchellh/go-testing-interface"
Expand All @@ -9,6 +10,7 @@ import (
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
)

Expand Down Expand Up @@ -288,3 +290,51 @@ func TestConfigSnapshotGRPCExposeHTTP1(t testing.T) *ConfigSnapshot {
},
})
}

// TestConfigSnapshotDiscoveryChain returns a fully populated snapshot using a discovery chain
func TestConfigSnapshotHCPMetrics(t testing.T) *ConfigSnapshot {
// DiscoveryChain without an UpstreamConfig should yield a
// filter chain when in transparent proxy mode
var (
collector = structs.NewServiceName(api.HCPMetricsCollectorName, nil)
collectorUID = NewUpstreamIDFromServiceName(collector)
collectorChain = discoverychain.TestCompileConfigEntries(t, api.HCPMetricsCollectorName, "default", "default", "dc1", connect.TestClusterID+".consul", nil)
)

return TestConfigSnapshot(t, func(ns *structs.NodeService) {
ns.Proxy.Config = map[string]interface{}{
"envoy_hcp_metrics_bind_socket_dir": "/tmp/consul/hcp-metrics",
}
}, []UpdateEvent{
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: nil,
},
},
{
CorrelationID: "discovery-chain:" + collectorUID.String(),
Result: &structs.DiscoveryChainResponse{
Chain: collectorChain,
},
},
{
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", api.HCPMetricsCollectorName) + collectorUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: []structs.CheckServiceNode{
{
Node: &structs.Node{
Address: "8.8.8.8",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: api.HCPMetricsCollectorName,
Address: "9.9.9.9",
Port: 9090,
},
},
},
},
},
})
}
4 changes: 4 additions & 0 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func TestAllResourcesFromSnapshot(t *testing.T) {
name: "local-mesh-gateway-with-peered-upstreams",
create: proxycfg.TestConfigSnapshotPeeringLocalMeshGateway,
},
{
name: "hcp-metrics",
create: proxycfg.TestConfigSnapshotHCPMetrics,
},
}
tests = append(tests, getConnectProxyTransparentProxyGoldenTestCases()...)
tests = append(tests, getMeshGatewayPeeringGoldenTestCases()...)
Expand Down
Loading