diff --git a/agent/hcp/client/metrics_client.go b/agent/hcp/client/metrics_client.go new file mode 100644 index 000000000000..15bd71097f79 --- /dev/null +++ b/agent/hcp/client/metrics_client.go @@ -0,0 +1,152 @@ +package client + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "time" + + "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-retryablehttp" + hcpcfg "github.com/hashicorp/hcp-sdk-go/config" + colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" + "golang.org/x/oauth2" + "google.golang.org/protobuf/proto" +) + +const ( + // HTTP Client config + defaultStreamTimeout = 15 * time.Second + + // Retry config + // TODO: Evenutally, we'd like to configure these values dynamically. + defaultRetryWaitMin = 1 * time.Second + defaultRetryWaitMax = 15 * time.Second + defaultRetryMax = 4 +) + +// MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway. +type MetricsClient interface { + ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error +} + +// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing. +type CloudConfig interface { + HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) +} + +// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle. +// It also holds default HTTP headers to add to export requests. +type otlpClient struct { + client *retryablehttp.Client + header *http.Header +} + +// NewMetricsClient returns a configured MetricsClient. +// The current implementation uses otlpClient to provide retry functionality. +func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, error) { + if cfg == nil { + return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)") + } + + if ctx == nil { + return nil, fmt.Errorf("failed to init telemetry client: provide a valid context") + } + + logger := hclog.FromContext(ctx) + + c, err := newHTTPClient(cfg, logger) + if err != nil { + return nil, fmt.Errorf("failed to init telemetry client: %v", err) + } + + header := make(http.Header) + header.Set("Content-Type", "application/x-protobuf") + + return &otlpClient{ + client: c, + header: &header, + }, nil +} + +// newHTTPClient configures the retryable HTTP client. +func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) { + hcpCfg, err := cloudCfg.HCPConfig() + if err != nil { + return nil, err + } + + tlsTransport := cleanhttp.DefaultPooledTransport() + tlsTransport.TLSClientConfig = hcpCfg.APITLSConfig() + + var transport http.RoundTripper = &oauth2.Transport{ + Base: tlsTransport, + Source: hcpCfg, + } + + client := &http.Client{ + Transport: transport, + Timeout: defaultStreamTimeout, + } + + retryClient := &retryablehttp.Client{ + HTTPClient: client, + Logger: logger.Named("hcp_telemetry_client"), + RetryWaitMin: defaultRetryWaitMin, + RetryWaitMax: defaultRetryWaitMax, + RetryMax: defaultRetryMax, + CheckRetry: retryablehttp.DefaultRetryPolicy, + Backoff: retryablehttp.DefaultBackoff, + } + + return retryClient, nil +} + +// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint. +// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config. +// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request. +func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { + pbRequest := &colmetricpb.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics}, + } + + body, err := proto.Marshal(pbRequest) + if err != nil { + return fmt.Errorf("failed to export metrics: %v", err) + } + + req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body)) + if err != nil { + return fmt.Errorf("failed to export metrics: %v", err) + } + req.Header = *o.header + + resp, err := o.client.Do(req.WithContext(ctx)) + if err != nil { + return fmt.Errorf("failed to export metrics: %v", err) + } + defer resp.Body.Close() + + var respData bytes.Buffer + if _, err := io.Copy(&respData, resp.Body); err != nil { + return fmt.Errorf("failed to export metrics: %v", err) + } + + if respData.Len() != 0 { + var respProto colmetricpb.ExportMetricsServiceResponse + if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { + return fmt.Errorf("failed to export metrics: %v", err) + } + + if respProto.PartialSuccess != nil { + msg := respProto.PartialSuccess.GetErrorMessage() + return fmt.Errorf("failed to export metrics: partial success: %s", msg) + } + } + + return nil +} diff --git a/agent/hcp/client/metrics_client_test.go b/agent/hcp/client/metrics_client_test.go new file mode 100644 index 000000000000..7c64d731d0b1 --- /dev/null +++ b/agent/hcp/client/metrics_client_test.go @@ -0,0 +1,107 @@ +package client + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" + "google.golang.org/protobuf/proto" +) + +func TestNewMetricsClient(t *testing.T) { + for name, test := range map[string]struct { + wantErr string + cfg CloudConfig + ctx context.Context + }{ + "success": { + cfg: &MockCloudCfg{}, + ctx: context.Background(), + }, + "failsWithoutCloudCfg": { + wantErr: "failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)", + cfg: nil, + ctx: context.Background(), + }, + "failsWithoutContext": { + wantErr: "failed to init telemetry client: provide a valid context", + cfg: MockCloudCfg{}, + ctx: nil, + }, + "failsHCPConfig": { + wantErr: "failed to init telemetry client", + cfg: MockErrCloudCfg{}, + ctx: context.Background(), + }, + } { + t.Run(name, func(t *testing.T) { + client, err := NewMetricsClient(test.cfg, test.ctx) + if test.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.wantErr) + return + } + + require.Nil(t, err) + require.NotNil(t, client) + }) + } +} + +func TestExportMetrics(t *testing.T) { + for name, test := range map[string]struct { + wantErr string + status int + }{ + "success": { + status: http.StatusOK, + }, + "failsWithNonRetryableError": { + status: http.StatusBadRequest, + wantErr: "failed to export metrics", + }, + } { + t.Run(name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf") + + require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token") + + body := colpb.ExportMetricsServiceResponse{} + + if test.wantErr != "" { + body.PartialSuccess = &colpb.ExportMetricsPartialSuccess{ + ErrorMessage: "partial failure", + } + } + bytes, err := proto.Marshal(&body) + + require.NoError(t, err) + + w.Header().Set("Content-Type", "application/x-protobuf") + w.WriteHeader(test.status) + w.Write(bytes) + })) + defer srv.Close() + + client, err := NewMetricsClient(MockCloudCfg{}, context.Background()) + require.NoError(t, err) + + ctx := context.Background() + metrics := &metricpb.ResourceMetrics{} + err = client.ExportMetrics(ctx, metrics, srv.URL) + + if test.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.wantErr) + return + } + + require.NoError(t, err) + }) + } +} diff --git a/agent/hcp/client/mock_CloudConfig.go b/agent/hcp/client/mock_CloudConfig.go new file mode 100644 index 000000000000..ed6e3358f7db --- /dev/null +++ b/agent/hcp/client/mock_CloudConfig.go @@ -0,0 +1,40 @@ +package client + +import ( + "crypto/tls" + "errors" + "net/url" + + hcpcfg "github.com/hashicorp/hcp-sdk-go/config" + "golang.org/x/oauth2" +) + +type mockHCPCfg struct{} + +func (m *mockHCPCfg) Token() (*oauth2.Token, error) { + return &oauth2.Token{ + AccessToken: "test-token", + }, nil +} + +func (m *mockHCPCfg) APITLSConfig() *tls.Config { return nil } + +func (m *mockHCPCfg) SCADAAddress() string { return "" } + +func (m *mockHCPCfg) SCADATLSConfig() *tls.Config { return &tls.Config{} } + +func (m *mockHCPCfg) APIAddress() string { return "" } + +func (m *mockHCPCfg) PortalURL() *url.URL { return &url.URL{} } + +type MockCloudCfg struct{} + +func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { + return &mockHCPCfg{}, nil +} + +type MockErrCloudCfg struct{} + +func (m MockErrCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) { + return nil, errors.New("test bad HCP config") +} diff --git a/go.mod b/go.mod index d78707f8c9a6..b3f1a2faa353 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/hashicorp/go-memdb v1.3.4 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-raftchunking v0.7.0 + github.com/hashicorp/go-retryablehttp v0.6.7 github.com/hashicorp/go-secure-stdlib/awsutil v0.1.6 github.com/hashicorp/go-sockaddr v1.0.2 github.com/hashicorp/go-syslog v1.0.0 @@ -95,6 +96,7 @@ require ( github.com/shirou/gopsutil/v3 v3.22.8 github.com/stretchr/testify v1.8.2 go.etcd.io/bbolt v1.3.6 + go.opentelemetry.io/proto/otlp v0.19.0 go.uber.org/goleak v1.1.10 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d golang.org/x/net v0.7.0 @@ -167,11 +169,11 @@ require ( github.com/googleapis/gax-go/v2 v2.1.0 // indirect github.com/googleapis/gnostic v0.2.0 // indirect github.com/gophercloud/gophercloud v0.3.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/go-msgpack/v2 v2.0.0 // indirect github.com/hashicorp/go-plugin v1.4.5 // indirect - github.com/hashicorp/go-retryablehttp v0.6.7 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-secure-stdlib/mlock v0.1.1 // indirect github.com/hashicorp/go-secure-stdlib/parseutil v0.1.6 // indirect @@ -225,7 +227,6 @@ require ( github.com/yusufpapurcu/wmi v1.2.2 // indirect go.mongodb.org/mongo-driver v1.10.0 // indirect go.opencensus.io v0.23.0 // indirect - go.opentelemetry.io/proto/otlp v0.7.0 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect diff --git a/go.sum b/go.sum index 964f00fa91e0..2d08b9447dfe 100644 --- a/go.sum +++ b/go.sum @@ -193,8 +193,10 @@ github.com/cloudflare/cloudflare-go v0.10.2/go.mod h1:qhVI5MKwBGhdNU89ZRz2plgYut github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= @@ -387,6 +389,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -507,6 +511,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QGdLI4y34qQGjGWO0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= github.com/hashicorp/consul-awsauth v0.0.0-20220713182709-05ac1c5c2706 h1:1ZEjnveDe20yFa6lSkfdQZm5BR/b271n0MsB5R2L3us= @@ -1068,8 +1074,9 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= +go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= @@ -1220,6 +1227,7 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1 h1:lxqLZaMad/dJHMFZH0NiNpiEZI/nhgWhe4wgzpE+MuA= golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= @@ -1528,6 +1536,7 @@ google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEc google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20220921223823-23cae91e6737 h1:K1zaaMdYBXRyX+cwFnxj7M6zwDyumLQMZ5xqwGvjreQ= google.golang.org/genproto v0.0.0-20220921223823-23cae91e6737/go.mod h1:2r/26NEF3bFmT3eC3aZreahSal0C3Shl8Gi6vyDYqOQ= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= @@ -1561,6 +1570,7 @@ google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnD google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= +google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw= google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=