Skip to content

Commit

Permalink
Backport of Allow HCP metrics collection for Envoy proxies into relea…
Browse files Browse the repository at this point in the history
…se/1.15.x (#16611)

Co-authored-by: Ashvitha Sridharan <ashvitha.sridharan@hashicorp.com>
Co-authored-by: Freddy <freddygv@users.noreply.github.com>
Co-authored-by: Ashvitha <ashvitha297@gmail.com>
  • Loading branch information
3 people authored Mar 10, 2023
1 parent 71a3e9f commit 02f8ed4
Show file tree
Hide file tree
Showing 17 changed files with 1,293 additions and 37 deletions.
3 changes: 3 additions & 0 deletions .changelog/16585.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
xds: Allow for configuring connect proxies to send service mesh telemetry to an HCP metrics collection service.
```
71 changes: 71 additions & 0 deletions agent/proxycfg/connect_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package proxycfg
import (
"context"
"fmt"
"path"
"strings"

"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/mitchellh/mapstructure"
)

type handlerConnectProxy struct {
Expand Down Expand Up @@ -103,6 +107,10 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
return snap, err
}

if err := s.maybeInitializeHCPMetricsWatches(ctx, snap); err != nil {
return snap, fmt.Errorf("failed to initialize HCP metrics watches: %w", err)
}

if s.proxyCfg.Mode == structs.ProxyModeTransparent {
// When in transparent proxy we will infer upstreams from intentions with this source
err := s.dataSources.IntentionUpstreams.Notify(ctx, &structs.ServiceSpecificRequest{
Expand Down Expand Up @@ -614,3 +622,66 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
}
return nil
}

// hcpMetricsConfig represents the basic opaque config values for pushing telemetry to HCP.
type hcpMetricsConfig struct {
// HCPMetricsBindSocketDir is a string that configures the directory for a
// unix socket where Envoy will forward metrics. These metrics get pushed to
// the HCP Metrics collector to show service mesh metrics on HCP.
HCPMetricsBindSocketDir string `mapstructure:"envoy_hcp_metrics_bind_socket_dir"`
}

func parseHCPMetricsConfig(m map[string]interface{}) (hcpMetricsConfig, error) {
var cfg hcpMetricsConfig
err := mapstructure.WeakDecode(m, &cfg)

if err != nil {
return cfg, fmt.Errorf("failed to decode: %w", err)
}

return cfg, nil
}

// maybeInitializeHCPMetricsWatches will initialize a synthetic upstream and discovery chain
// watch for the HCP metrics collector, if metrics collection is enabled on the proxy registration.
func (s *handlerConnectProxy) maybeInitializeHCPMetricsWatches(ctx context.Context, snap ConfigSnapshot) error {
hcpCfg, err := parseHCPMetricsConfig(s.proxyCfg.Config)
if err != nil {
s.logger.Error("failed to parse connect.proxy.config", "error", err)
}

if hcpCfg.HCPMetricsBindSocketDir == "" {
// Metrics collection is not enabled, return early.
return nil
}

// The path includes the proxy ID so that when multiple proxies are on the same host
// they each have a distinct path to send their metrics.
sock := fmt.Sprintf("%s_%s.sock", s.proxyID.NamespaceOrDefault(), s.proxyID.ID)
path := path.Join(hcpCfg.HCPMetricsBindSocketDir, sock)

upstream := structs.Upstream{
DestinationNamespace: acl.DefaultNamespaceName,
DestinationPartition: s.proxyID.PartitionOrDefault(),
DestinationName: api.HCPMetricsCollectorName,
LocalBindSocketPath: path,
Config: map[string]interface{}{
"protocol": "grpc",
},
}
uid := NewUpstreamID(&upstream)
snap.ConnectProxy.UpstreamConfig[uid] = &upstream

err = s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: upstream.DestinationName,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInNamespace: uid.NamespaceOrDefault(),
EvaluateInPartition: uid.PartitionOrDefault(),
}, "discovery-chain:"+uid.String(), s.ch)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", uid.String(), err)
}
return nil
}
181 changes: 171 additions & 10 deletions agent/proxycfg/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
apimod "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/sdk/testutil"
Expand Down Expand Up @@ -455,16 +456,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {

// Used to account for differences in OSS/ent implementations of ServiceID.String()
var (
db = structs.NewServiceName("db", nil)
billing = structs.NewServiceName("billing", nil)
api = structs.NewServiceName("api", nil)
apiA = structs.NewServiceName("api-a", nil)

apiUID = NewUpstreamIDFromServiceName(api)
dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query")
extApiUID = NewUpstreamIDFromServiceName(apiA)
extDBUID = NewUpstreamIDFromServiceName(db)
db = structs.NewServiceName("db", nil)
billing = structs.NewServiceName("billing", nil)
api = structs.NewServiceName("api", nil)
apiA = structs.NewServiceName("api-a", nil)
hcpCollector = structs.NewServiceName(apimod.HCPMetricsCollectorName, nil)

apiUID = NewUpstreamIDFromServiceName(api)
dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query")
extApiUID = NewUpstreamIDFromServiceName(apiA)
extDBUID = NewUpstreamIDFromServiceName(db)
hcpCollectorUID = NewUpstreamIDFromServiceName(hcpCollector)
)
// TODO(peering): NewUpstreamIDFromServiceName should take a PeerName
extApiUID.Peer = "peer-a"
Expand Down Expand Up @@ -3623,6 +3626,164 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
"hcp-metrics": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Address: "10.0.1.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Config: map[string]interface{}{
"envoy_hcp_metrics_bind_socket_dir": "/tmp/consul/hcp-metrics/",
},
},
},
sourceDC: "dc1",
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: hcpCollector.Name,
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
EvaluateInPartition: "default",
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{
Token: aclToken,
},
}),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "should not be valid")

require.Len(t, snap.ConnectProxy.DiscoveryChain, 0, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.WatchedDiscoveryChains, 0, "%+v", snap.ConnectProxy.WatchedDiscoveryChains)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 0, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 0, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
},
},
{
events: []UpdateEvent{
rootWatchEvent(),
{
CorrelationID: peeringTrustBundlesWatchID,
Result: peerTrustBundles,
},
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{},
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, hcpCollector.Name, "default", "default", "dc1", "trustdomain.consul", nil),
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)

// An event was received with the HCP collector's discovery chain, which sets up some bookkeeping in the snapshot.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, hcpCollectorUID)

require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, hcpCollectorUID)

expectUpstream := structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: apimod.HCPMetricsCollectorName,
LocalBindSocketPath: "/tmp/consul/hcp-metrics/default_web-sidecar-proxy.sock",
Config: map[string]interface{}{
"protocol": "grpc",
},
}
uid := NewUpstreamID(&expectUpstream)

require.Contains(t, snap.ConnectProxy.UpstreamConfig, uid)
require.Equal(t, &expectUpstream, snap.ConnectProxy.UpstreamConfig[uid])

// No endpoints have arrived yet.
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID], 0, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
},
},
{
requiredWatches: map[string]verifyWatchRequest{
fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.HCPMetricsCollectorName) + hcpCollectorUID.String(): genVerifyServiceSpecificRequest(apimod.HCPMetricsCollectorName, "", "dc1", true),
},
events: []UpdateEvent{
{
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.HCPMetricsCollectorName) + hcpCollectorUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "10.0.0.1",
},
Service: &structs.NodeService{
ID: apimod.HCPMetricsCollectorName,
Service: apimod.HCPMetricsCollectorName,
Port: 8080,
},
},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)

// Discovery chain for the HCP collector should still be stored in the snapshot.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, hcpCollectorUID)

require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, hcpCollectorUID)

// An endpoint arrived for the HCP collector, so it should be present in the snapshot.
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID], 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)

nodes := structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "10.0.0.1",
},
Service: &structs.NodeService{
ID: apimod.HCPMetricsCollectorName,
Service: apimod.HCPMetricsCollectorName,
Port: 8080,
},
},
}
target := fmt.Sprintf("%s.default.default.dc1", apimod.HCPMetricsCollectorName)
require.Equal(t, nodes, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID][target])
},
},
},
},
}

for name, tc := range cases {
Expand Down
50 changes: 50 additions & 0 deletions agent/proxycfg/testing_connect_proxy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxycfg

import (
"fmt"
"time"

"github.com/mitchellh/go-testing-interface"
Expand All @@ -9,6 +10,7 @@ import (
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
)

Expand Down Expand Up @@ -288,3 +290,51 @@ func TestConfigSnapshotGRPCExposeHTTP1(t testing.T) *ConfigSnapshot {
},
})
}

// TestConfigSnapshotDiscoveryChain returns a fully populated snapshot using a discovery chain
func TestConfigSnapshotHCPMetrics(t testing.T) *ConfigSnapshot {
// DiscoveryChain without an UpstreamConfig should yield a
// filter chain when in transparent proxy mode
var (
collector = structs.NewServiceName(api.HCPMetricsCollectorName, nil)
collectorUID = NewUpstreamIDFromServiceName(collector)
collectorChain = discoverychain.TestCompileConfigEntries(t, api.HCPMetricsCollectorName, "default", "default", "dc1", connect.TestClusterID+".consul", nil)
)

return TestConfigSnapshot(t, func(ns *structs.NodeService) {
ns.Proxy.Config = map[string]interface{}{
"envoy_hcp_metrics_bind_socket_dir": "/tmp/consul/hcp-metrics",
}
}, []UpdateEvent{
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: nil,
},
},
{
CorrelationID: "discovery-chain:" + collectorUID.String(),
Result: &structs.DiscoveryChainResponse{
Chain: collectorChain,
},
},
{
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", api.HCPMetricsCollectorName) + collectorUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: []structs.CheckServiceNode{
{
Node: &structs.Node{
Address: "8.8.8.8",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: api.HCPMetricsCollectorName,
Address: "9.9.9.9",
Port: 9090,
},
},
},
},
},
})
}
4 changes: 4 additions & 0 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func TestAllResourcesFromSnapshot(t *testing.T) {
name: "local-mesh-gateway-with-peered-upstreams",
create: proxycfg.TestConfigSnapshotPeeringLocalMeshGateway,
},
{
name: "hcp-metrics",
create: proxycfg.TestConfigSnapshotHCPMetrics,
},
}
tests = append(tests, getConnectProxyTransparentProxyGoldenTestCases()...)
tests = append(tests, getMeshGatewayPeeringGoldenTestCases()...)
Expand Down
Loading

0 comments on commit 02f8ed4

Please sign in to comment.