From 000ee255a2682d8ca5e99b7f8cafa91da5d13d26 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Wed, 9 Nov 2022 13:30:34 -0500 Subject: [PATCH 1/5] swap to gRPC WatchRoots impl deps: hashicorp/consul/proto-public plumb through Consul address and gRPC port config, add trace logging disambiguate grpcClient and apiClient in certmanager add consul-grpc-port flag to controller, fix TLS config for gRPC connection plumb gRPC TLS config through deployment exec command pass gRPC config to NewCertManager in tests add support for certmanager insecure gRPC conn using WithTransportCredentials append Consul ACL token to gRPC calls --- go.mod | 1 + go.sum | 2 + internal/commands/exec/exec.go | 21 +++- internal/commands/server/command.go | 5 +- internal/commands/server/server.go | 12 ++ internal/consul/certmanager.go | 186 +++++++++++++++++++--------- internal/consul/certmanager_test.go | 49 ++++++-- internal/testing/e2e/consul.go | 33 +++-- internal/testing/e2e/gateway.go | 15 ++- 9 files changed, 248 insertions(+), 76 deletions(-) diff --git a/go.mod b/go.mod index ad94cdef7..f85bc631a 100644 --- a/go.mod +++ b/go.mod @@ -150,6 +150,7 @@ require ( github.com/gostaticanalysis/comment v1.4.2 // indirect github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect github.com/gostaticanalysis/nilerr v0.1.1 // indirect + github.com/hashicorp/consul/proto-public v0.1.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect diff --git a/go.sum b/go.sum index 124ef0fcd..70242278a 100644 --- a/go.sum +++ b/go.sum @@ -680,6 +680,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/consul/api v1.15.3 h1:WYONYL2rxTXtlekAqblR2SCdJsizMDIj/uXb5wNy9zU= github.com/hashicorp/consul/api v1.15.3/go.mod h1:/g/qgcoBcEXALCNZgRRisyTW0nY86++L0KbeAMXYCeY= +github.com/hashicorp/consul/proto-public v0.1.1 h1:C28d6xX+DwoD2dSex/kwD0/nJ4XNgoBtQXtZEm4FGEk= +github.com/hashicorp/consul/proto-public v0.1.1/go.mod h1:vs2KkuWwtjkIgA5ezp4YKPzQp4GitV+q/+PvksrA92k= github.com/hashicorp/consul/sdk v0.11.0 h1:HRzj8YSCln2yGgCumN5CL8lYlD3gBurnervJRJAZyC4= github.com/hashicorp/consul/sdk v0.11.0/go.mod h1:yPkX5Q6CsxTFMjQQDJwzeNmUUF5NUGGbrDsv9wTb8cw= github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= diff --git a/internal/commands/exec/exec.go b/internal/commands/exec/exec.go index ef2ae1407..43646d6aa 100644 --- a/internal/commands/exec/exec.go +++ b/internal/commands/exec/exec.go @@ -16,6 +16,8 @@ import ( "github.com/hashicorp/consul-api-gateway/internal/consul" "github.com/hashicorp/consul-api-gateway/internal/envoy" + + "google.golang.org/grpc/metadata" ) type AuthConfig struct { @@ -78,6 +80,7 @@ func RunExec(config ExecConfig) (ret int) { // First do the ACL Login, if necessary. var consulClient *api.Client + var consulConfig *api.Config var token string var err error if config.AuthConfig.Method != "" { @@ -89,7 +92,7 @@ func RunExec(config ExecConfig) (ret int) { } config.Logger.Trace("consul login complete") } else { - consulConfig := &config.ConsulConfig + consulConfig = &config.ConsulConfig consulConfig.Namespace = config.GatewayConfig.Namespace consulClient, err = api.NewClient(consulConfig) if err != nil { @@ -156,8 +159,19 @@ func RunExec(config ExecConfig) (ret int) { if config.EnvoyConfig.CertificateDirectory != "" { options.Directory = config.EnvoyConfig.CertificateDirectory } + + tlsConfig, err := api.SetupTLSConfig(&config.ConsulConfig.TLSConfig) + if err != nil { + return 1 + } + certManager := consul.NewCertManager( config.Logger.Named("cert-manager"), + consul.Config{ + Addresses: []string{config.EnvoyConfig.XDSAddress}, + GRPCPort: config.EnvoyConfig.XDSPort, + TLS: tlsConfig, + }, client, config.GatewayConfig.Name, options, @@ -174,6 +188,11 @@ func RunExec(config ExecConfig) (ret int) { } group, groupCtx := errgroup.WithContext(ctx) + + // Append token with service:write permissions to allow certmanager to + // watch roots and sign CSRs + groupCtx = metadata.AppendToOutgoingContext(groupCtx, "x-consul-token", token) + group.Go(func() error { return certManager.Manage(groupCtx) }) diff --git a/internal/commands/server/command.go b/internal/commands/server/command.go index e9a2424e9..8163b67f5 100644 --- a/internal/commands/server/command.go +++ b/internal/commands/server/command.go @@ -36,7 +36,8 @@ type Command struct { flagCASecret string // CA Secret for Consul server flagCASecretNamespace string // CA Secret namespace for Consul server - flagConsulAddress string // Consul server address + flagConsulAddress string // Consul server address + flagConsulGRPCPort int // Consul gRPC port flagPrimaryDatacenter string // Primary datacenter, may or may not be the datacenter this controller is running in @@ -71,6 +72,7 @@ func (c *Command) init() { c.flagSet.StringVar(&c.flagCASecret, "ca-secret", "", "CA Secret for Consul server.") c.flagSet.StringVar(&c.flagCASecretNamespace, "ca-secret-namespace", "default", "CA Secret namespace for Consul server.") c.flagSet.StringVar(&c.flagConsulAddress, "consul-address", "", "Consul Address.") + c.flagSet.IntVar(&c.flagConsulGRPCPort, "consul-grpc-port", 8502, "Consul gRPC Port.") c.flagSet.StringVar(&c.flagPrimaryDatacenter, "primary-datacenter", "", "Name of the primary Consul datacenter") c.flagSet.StringVar(&c.flagSDSServerHost, "sds-server-host", defaultSDSServerHost, "SDS Server Host.") c.flagSet.StringVar(&c.flagK8sContext, "k8s-context", "", "Kubernetes context to use.") @@ -170,6 +172,7 @@ func (c *Command) Run(args []string) int { Context: context.Background(), Logger: logger, ConsulConfig: consulCfg, + ConsulGRPCPort: c.flagConsulGRPCPort, K8sConfig: cfg, ProfilingPort: c.flagPprofPort, MetricsPort: c.flagMetricsPort, diff --git a/internal/commands/server/server.go b/internal/commands/server/server.go index 855d9f500..4f4bb713c 100644 --- a/internal/commands/server/server.go +++ b/internal/commands/server/server.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "os/signal" + "strings" "syscall" "golang.org/x/sync/errgroup" @@ -27,6 +28,7 @@ type ServerConfig struct { Context context.Context Logger hclog.Logger ConsulConfig *api.Config + ConsulGRPCPort int K8sConfig *k8s.Config ProfilingPort int MetricsPort int @@ -75,8 +77,18 @@ func RunServer(config ServerConfig) int { options := consul.DefaultCertManagerOptions() options.PrimaryDatacenter = config.PrimaryDatacenter + tlsConfig, err := api.SetupTLSConfig(&config.ConsulConfig.TLSConfig) + if err != nil { + return 1 + } + certManager := consul.NewCertManager( config.Logger.Named("cert-manager"), + consul.Config{ + Addresses: []string{strings.Split(config.ConsulConfig.Address, ":")[0]}, + GRPCPort: config.ConsulGRPCPort, + TLS: tlsConfig, + }, client, "consul-api-gateway-controller", options, diff --git a/internal/consul/certmanager.go b/internal/consul/certmanager.go index 6f0563d1a..6c23e595e 100644 --- a/internal/consul/certmanager.go +++ b/internal/consul/certmanager.go @@ -11,7 +11,6 @@ import ( "path" "sync" "text/template" - "time" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api/watch" @@ -19,6 +18,12 @@ import ( "github.com/hashicorp/consul-api-gateway/internal/common" "github.com/hashicorp/consul-api-gateway/internal/metrics" + + "github.com/hashicorp/consul/proto-public/pbconnectca" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" ) const ( @@ -38,6 +43,15 @@ var ( sdsCAConfigTemplate = template.New("sdsCA") ) +type Config struct { + Addresses []string + GRPCPort int + Namespace string + Partition string + Datacenter string + TLS *tls.Config +} + type sdsClusterArgs struct { Name string CertSDSConfigPath string @@ -95,8 +109,10 @@ type certWriter func() error // Once a leaf certificate has expired, it generates a new certificate and writes // it to the location given in the configuration options with which it was created. type CertManager struct { - client Client - logger hclog.Logger + cfg Config + apiClient Client + grpcClient pbconnectca.ConnectCAServiceClient + logger hclog.Logger service string directory string @@ -105,20 +121,21 @@ type CertManager struct { sdsAddress string sdsPort int - lock sync.RWMutex + mutex sync.RWMutex signalled bool initializeSignal chan struct{} // cached values ca []byte + trustDomain string certificate []byte privateKey []byte tlsCertificate *tls.Certificate rootCertificatePool *x509.CertPool // watches - rootWatch *watch.Plan + // rootWatch *watch.Plan leafWatch *watch.Plan // these can be overwritten to modify retry logic in testing @@ -127,12 +144,13 @@ type CertManager struct { } // NewCertManager creates a new CertManager instance. -func NewCertManager(logger hclog.Logger, client Client, service string, options *CertManagerOptions) *CertManager { +func NewCertManager(logger hclog.Logger, cfg Config, apiClient Client, service string, options *CertManagerOptions) *CertManager { if options == nil { options = DefaultCertManagerOptions() } manager := &CertManager{ - client: client, + cfg: cfg, + apiClient: apiClient, logger: logger, primaryDatacenter: options.PrimaryDatacenter, sdsAddress: options.SDSAddress, @@ -146,28 +164,65 @@ func NewCertManager(logger hclog.Logger, client Client, service string, options return manager } -func (c *CertManager) handleRootWatch(blockParam watch.BlockingParamVal, raw interface{}) { - if raw == nil { - c.logger.Error("received nil interface") - return +func (c *CertManager) watchRoots(ctx context.Context, rotatedRootCh chan *pbconnectca.WatchRootsResponse) error { + c.logger.Trace("starting CA roots watch stream") + + // This doesn't appear to allow specifying a primary datacenter, unclear if + // it gets forwarded automatically or the primary must be dialed directly. + stream, err := c.grpcClient.WatchRoots(ctx, &pbconnectca.WatchRootsRequest{}) + if err != nil { + c.logger.Error(err.Error()) + return err } - v, ok := raw.(*api.CARootList) - if !ok || v == nil { - c.logger.Error("got invalid response from root watch") - return + for { + root, err := stream.Recv() + if err != nil { + c.logger.Error(err.Error()) + return err + } + select { + case rotatedRootCh <- root: + case <-ctx.Done(): + return nil + } } +} - c.lock.Lock() - defer c.lock.Unlock() +func (c *CertManager) handleRootWatch(ctx context.Context, response *pbconnectca.WatchRootsResponse) { + c.logger.Trace("handling CA roots response") + + // TODO: this shouldn't ever really happen? + if response == nil { + c.logger.Error("received nil interface") + return // ignore + } + + c.mutex.Lock() + defer c.mutex.Unlock() + + // Set trust domain + c.trustDomain = response.TrustDomain + c.logger.Trace("setting trust domain") roots := x509.NewCertPool() - for _, root := range v.Roots { - roots.AppendCertsFromPEM([]byte(root.RootCertPEM)) - if root.Active { - c.ca = []byte(root.RootCertPEM) + id := response.ActiveRootId + foundActiveRoot := false + for _, root := range response.Roots { + // Add all roots, including non-active, to root certificate pool + roots.AppendCertsFromPEM([]byte(root.RootCert)) + + // Set active root as CA + if root.Id == id { + c.logger.Trace("found active root") + foundActiveRoot = true + c.ca = []byte(root.RootCert) } } + if !foundActiveRoot { + c.logger.Error("active root not found from root watch") + } + c.rootCertificatePool = roots if err := c.writeCerts(); err != nil { @@ -196,8 +251,8 @@ func (c *CertManager) handleLeafWatch(blockParam watch.BlockingParamVal, raw int metrics.Registry.IncrCounter(metrics.ConsulLeafCertificateFetches, 1) - c.lock.Lock() - defer c.lock.Unlock() + c.mutex.Lock() + defer c.mutex.Unlock() c.certificate = []byte(v.CertPEM) c.privateKey = []byte(v.PrivateKeyPEM) @@ -215,15 +270,45 @@ func (c *CertManager) handleLeafWatch(blockParam watch.BlockingParamVal, raw int func (c *CertManager) Manage(ctx context.Context) error { c.logger.Trace("running cert manager") - rootWatch, err := watch.Parse(map[string]interface{}{ - "datacenter": c.primaryDatacenter, - "type": "connect_roots", - }) + grpcAddress := fmt.Sprintf("%s:%d", c.cfg.Addresses[0], c.cfg.GRPCPort) + + c.logger.Trace("dialing " + grpcAddress) + + // Default to insecure credentials unless TLS config has been provided + tlsCredentials := insecure.NewCredentials() + if c.cfg.TLS != nil { + tlsCredentials = credentials.NewTLS(c.cfg.TLS) + } + + conn, err := grpc.DialContext(ctx, grpcAddress, grpc.WithTransportCredentials(tlsCredentials)) if err != nil { + c.logger.Error(err.Error()) return err } - c.rootWatch = rootWatch - c.rootWatch.HybridHandler = c.handleRootWatch + + c.grpcClient = pbconnectca.NewConnectCAServiceClient(conn) + + // TODO: should this move to a field on the CertManager struct? + rotatedRootCh := make(chan *pbconnectca.WatchRootsResponse) + + // TODO: does this need to be stopped/cleaned up at some point, or will + // the context handle that? + // TODO: is there a good reason to wrap these in an errgroup? + go c.watchRoots(ctx, rotatedRootCh) + + // Don't try to pull from the channel until after the watch has started + go func() error { + for { + select { + case r := <-rotatedRootCh: + c.handleRootWatch(ctx, r) + // TODO: generate and sign new leaf certificate + // expiration, err = c.fetchCert(ctx) + case <-ctx.Done(): + return nil + } + } + }() leafWatch, err := watch.Parse(map[string]interface{}{ "type": "connect_leaf", @@ -236,34 +321,17 @@ func (c *CertManager) Manage(ctx context.Context) error { c.leafWatch.HybridHandler = c.handleLeafWatch wrapWatch := func(w *watch.Plan) { - if err := w.RunWithClientAndHclog(c.client.Internal(), c.logger); err != nil { + if err := w.RunWithClientAndHclog(c.apiClient.Internal(), c.logger); err != nil { c.logger.Error("consul watch.Plan returned unexpectedly", "error", err) } c.logger.Trace("consul watch.Plan stopped") } - // Consul 1.11 has a bug where blocking queries on the leaf certificate endpoint - // cause all subsequent non-blocking queries to unexpectedly block. The problem - // is that this means that, on restart, the query for a leaf certificate with - // the given service id will never return until the previous leaf certificate - // expires/is rotated. Adding a wait here causes the API to return once the timeout has - // been hit -- allowing us to short-circuit the buggy blocking. The subsequent - // goroutines can then be leveraged to pick up any certificate rotations. - if !c.skipExtraFetch { - leafCert, _, err := c.client.Agent().ConnectCALeaf(c.service, &api.QueryOptions{ - WaitTime: 1 * time.Second, - }) - if err != nil { - c.logger.Error("error grabbing leaf certificate", "error", err) - return err - } - c.handleLeafWatch(nil, leafCert) - } - go wrapWatch(c.rootWatch) + // go wrapWatch(c.rootWatch) go wrapWatch(c.leafWatch) <-ctx.Done() - c.rootWatch.Stop() + // c.rootWatch.Stop() c.leafWatch.Stop() return nil @@ -315,40 +383,40 @@ func (c *CertManager) signal() { // RootCA returns the current CA cert func (c *CertManager) RootCA() []byte { - c.lock.RLock() - defer c.lock.RUnlock() + c.mutex.RLock() + defer c.mutex.RUnlock() return c.ca } // RootPool returns the certificate pool for the connect root CA func (c *CertManager) RootPool() *x509.CertPool { - c.lock.RLock() - defer c.lock.RUnlock() + c.mutex.RLock() + defer c.mutex.RUnlock() return c.rootCertificatePool } // Certificate returns the current leaf cert func (c *CertManager) Certificate() []byte { - c.lock.RLock() - defer c.lock.RUnlock() + c.mutex.RLock() + defer c.mutex.RUnlock() return c.certificate } // PrivateKey returns the current leaf cert private key func (c *CertManager) PrivateKey() []byte { - c.lock.RLock() - defer c.lock.RUnlock() + c.mutex.RLock() + defer c.mutex.RUnlock() return c.privateKey } // TLSCertificate returns the current leaf certificate as a parsed structure func (c *CertManager) TLSCertificate() *tls.Certificate { - c.lock.RLock() - defer c.lock.RUnlock() + c.mutex.RLock() + defer c.mutex.RUnlock() return c.tlsCertificate } diff --git a/internal/consul/certmanager_test.go b/internal/consul/certmanager_test.go index abee0097a..73e4754de 100644 --- a/internal/consul/certmanager_test.go +++ b/internal/consul/certmanager_test.go @@ -49,7 +49,17 @@ func TestManage(t *testing.T) { options := DefaultCertManagerOptions() options.Directory = directory - manager := NewCertManager(hclog.NewNullLogger(), NewClient(server.consul), service, options) + manager := NewCertManager( + hclog.NewNullLogger(), + Config{ + Addresses: []string{server.consulAddress}, + GRPCPort: 8502, + TLS: nil, + }, + NewClient(server.consulClient), + service, + options, + ) manager.skipExtraFetch = true ctx, cancel := context.WithCancel(context.Background()) @@ -108,7 +118,17 @@ func TestManage_Refresh(t *testing.T) { server := runCertServer(t, 0, 0, service, 2) options := DefaultCertManagerOptions() - manager := NewCertManager(hclog.NewNullLogger(), NewClient(server.consul), service, options) + manager := NewCertManager( + hclog.NewNullLogger(), + Config{ + Addresses: []string{server.consulAddress}, + GRPCPort: 8502, + TLS: nil, + }, + NewClient(server.consulClient), + service, + options, + ) manager.skipExtraFetch = true writes := int32(0) @@ -156,12 +176,19 @@ func TestManage_WaitCancel(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) defer cancel() - err := NewCertManager(hclog.NewNullLogger(), nil, "", nil).WaitForWrite(ctx) + err := NewCertManager( + hclog.NewNullLogger(), + Config{}, + nil, + "", + nil, + ).WaitForWrite(ctx) require.Error(t, err) } type certServer struct { - consul *api.Client + consulAddress string + consulClient *api.Client fakeRootCertPEM string fakeClientCert string @@ -250,11 +277,13 @@ func runCertServer(t *testing.T, leafFailures, rootFailures uint64, service stri serverURL, err := url.Parse(consulServer.URL) require.NoError(t, err) - clientConfig := &api.Config{Address: serverURL.String()} + consulHTTPAddress := serverURL.String() + clientConfig := &api.Config{Address: consulHTTPAddress} client, err := api.NewClient(clientConfig) require.NoError(t, err) - server.consul = client + server.consulAddress = strings.Split(consulHTTPAddress, ":")[0] + server.consulClient = client return server } @@ -316,7 +345,13 @@ func TestRenderSDS(t *testing.T) { options := DefaultCertManagerOptions() options.Directory = "/certs" - manager := NewCertManager(hclog.NewNullLogger(), nil, gwTesting.RandomString(), options) + manager := NewCertManager( + hclog.NewNullLogger(), + Config{}, + nil, + gwTesting.RandomString(), + options, + ) manager.configDirectory = directory config, err := manager.RenderSDSConfig() diff --git a/internal/testing/e2e/consul.go b/internal/testing/e2e/consul.go index 356bd2cbe..d172f767c 100644 --- a/internal/testing/e2e/consul.go +++ b/internal/testing/e2e/consul.go @@ -3,6 +3,7 @@ package e2e import ( "bytes" "context" + "crypto/tls" "errors" "fmt" "html/template" @@ -88,6 +89,7 @@ func init() { type consulTestEnvironment struct { ca []byte + consulTLSConfig *tls.Config consulClient *api.Client token string policy *api.ACLPolicy @@ -174,14 +176,21 @@ func CreateTestConsulContainer(name, namespace string) env.Func { return nil, err } + consulAPITLSConfig := api.TLSConfig{ + CAPem: rootCA.CertBytes, + CertPEM: clientCert.CertBytes, + KeyPEM: clientCert.PrivateKeyBytes, + } + + consulTLSConfig, err := api.SetupTLSConfig(&consulAPITLSConfig) + if err != nil { + return nil, err + } + consulClient, err := api.NewClient(&api.Config{ - Address: fmt.Sprintf("localhost:%d", httpsPort), - Scheme: "https", - TLSConfig: api.TLSConfig{ - CAPem: rootCA.CertBytes, - CertPEM: clientCert.CertBytes, - KeyPEM: clientCert.PrivateKeyBytes, - }, + Address: fmt.Sprintf("localhost:%d", httpsPort), + Scheme: "https", + TLSConfig: consulAPITLSConfig, }) if err != nil { return nil, err @@ -210,6 +219,7 @@ func CreateTestConsulContainer(name, namespace string) env.Func { env := &consulTestEnvironment{ ca: rootCA.CertBytes, + consulTLSConfig: consulTLSConfig, consulClient: consulClient, httpPort: httpsPort, httpFlattenedPort: httpFlattenedPort, @@ -260,15 +270,20 @@ func consulGRPCVarName() string { tagTokens := strings.Split(consulImage, ":") tag := tagTokens[len(tagTokens)-1] imageVersion, err := semver.NewVersion(tag) + log.Printf("Consul image version: %s", imageVersion.String()) if err != nil { + log.Printf("Error parsing Consul image version: %s", err.Error()) return "grpc" } breakingVersion, err := semver.NewVersion(grpcConsulIncompatibleNameVersion) + log.Printf("Consul breaking version: %s", breakingVersion.String()) if err != nil { + log.Printf("Error parsing Consul breaking version: %s", breakingVersion.String()) return "grpc" } // we check major/minor directly since the semver check fails with the trailing -dev tag if breakingVersion.Major() > imageVersion.Major() || (breakingVersion.Major() == imageVersion.Major() && breakingVersion.Minor() > imageVersion.Minor()) { + log.Print("Breaking version greater than image version") return "grpc" } return "grpc_tls" @@ -443,6 +458,10 @@ func consulDeployment(namespace string, httpsPort, grpcPort int) *apps.Deploymen } } +func ConsulTLSConfig(ctx context.Context) *tls.Config { + return mustGetTestEnvironment(ctx).consulTLSConfig +} + func ConsulClient(ctx context.Context) *api.Client { return mustGetTestEnvironment(ctx).consulClient } diff --git a/internal/testing/e2e/gateway.go b/internal/testing/e2e/gateway.go index 99290c5df..7b86680ce 100644 --- a/internal/testing/e2e/gateway.go +++ b/internal/testing/e2e/gateway.go @@ -18,6 +18,8 @@ import ( "github.com/hashicorp/consul-api-gateway/internal/k8s" "github.com/hashicorp/consul-api-gateway/internal/k8s/utils" "github.com/hashicorp/consul-api-gateway/internal/store" + + "google.golang.org/grpc/metadata" ) type gatewayTestContext struct{} @@ -82,6 +84,11 @@ func (p *gatewayTestEnvironment) run(ctx context.Context, namespace string, cfg certManagerOptions.Directory = p.directory certManager := consul.NewCertManager( nullLogger, + consul.Config{ + Addresses: []string{"localhost"}, + GRPCPort: ConsulGRPCPort(ctx), + TLS: ConsulTLSConfig(ctx), + }, client, "consul-api-gateway", certManagerOptions, @@ -90,13 +97,19 @@ func (p *gatewayTestEnvironment) run(ctx context.Context, namespace string, cfg // wait for the first write cancelCtx, cancel := context.WithCancel(ctx) group, groupCtx := errgroup.WithContext(cancelCtx) + + // Append token with service:write permissions to allow certmanager to + // watch roots and sign CSRs + groupCtx = metadata.AppendToOutgoingContext(groupCtx, "x-consul-token", ConsulInitialManagementToken(ctx)) + group.Go(func() error { return certManager.Manage(groupCtx) }) - timeoutCtx, timeoutCancel := context.WithTimeout(groupCtx, 10*time.Second) + timeoutCtx, timeoutCancel := context.WithTimeout(groupCtx, 30*time.Second) defer timeoutCancel() err = certManager.WaitForWrite(timeoutCtx) if err != nil { + log.Print(err.Error()) cancel() return err } From 3bc8dc488a4d939ae2297b4158e963b1be2e4c63 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Mon, 14 Nov 2022 17:14:13 -0500 Subject: [PATCH 2/5] add gRPC mock server for certmanager tests add gRPC WatchRoots mock endpoint to exec mockConsulServer fixup lint --- internal/commands/exec/exec_test.go | 121 ++++++++++++++--- internal/consul/certmanager_test.go | 194 ++++++++++++++++++++++------ 2 files changed, 254 insertions(+), 61 deletions(-) diff --git a/internal/commands/exec/exec_test.go b/internal/commands/exec/exec_test.go index 45a41a196..ffe479ebc 100644 --- a/internal/commands/exec/exec_test.go +++ b/internal/commands/exec/exec_test.go @@ -3,22 +3,31 @@ package exec import ( "context" "encoding/json" + "errors" "fmt" + "net" "net/http" "net/http/httptest" "net/url" "os" "path" + "strconv" "strings" + "sync" "testing" "time" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto-public/pbconnectca" "github.com/hashicorp/go-hclog" gwTesting "github.com/hashicorp/consul-api-gateway/internal/testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestRunExecLoginError(t *testing.T) { @@ -274,6 +283,8 @@ func TestRunExecEnvoyExecError(t *testing.T) { CertificateDirectory: directory, BootstrapFile: path.Join(directory, "boostrap.json"), Binary: "thisisnotabinary", + XDSAddress: consul.address, + XDSPort: consul.grpcPort, }, isTest: true, })) @@ -315,6 +326,8 @@ func TestRunExecShutdown(t *testing.T) { Binary: "echo", ExtraArgs: []string{output}, Output: &buffer, + XDSAddress: consul.address, + XDSPort: consul.grpcPort, }, isTest: true, })) @@ -370,6 +383,8 @@ func TestRunExecShutdownACLs(t *testing.T) { Binary: "echo", ExtraArgs: []string{output}, Output: &buffer, + XDSAddress: consul.address, + XDSPort: consul.grpcPort, }, isTest: true, })) @@ -391,10 +406,63 @@ type mockConsulServer struct { client *api.Client config *api.Config + address string + httpPort int + grpcPort int + token string rootCertPEM string clientCert string clientPrivateKey string + + mutex sync.RWMutex + + opts mockConsulOptions +} + +func (c *mockConsulServer) WatchRoots(request *pbconnectca.WatchRootsRequest, stream pbconnectca.ConnectCAService_WatchRootsServer) error { + writeCertificate := func() error { + fmt.Printf("writing certificate to channel") + + if c.opts.rootCertFail { + return status.Error(codes.Internal, "root watch failure") + } + + c.mutex.RLock() + ca := c.rootCertPEM + c.mutex.RUnlock() + + if err := stream.Send(&pbconnectca.WatchRootsResponse{ + ActiveRootId: "test", + Roots: []*pbconnectca.CARoot{{ + Id: "test", + RootCert: ca, + }}, + }); err != nil { + return err + } + return nil + } + + // do initial write + if err := writeCertificate(); err != nil { + return err + } + + for { + select { + case <-stream.Context().Done(): + return nil + // case <-c.rotate: + // if err := writeCertificate(); err != nil { + // return err + // } + } + } +} + +func (c *mockConsulServer) Sign(ctx context.Context, request *pbconnectca.SignRequest) (*pbconnectca.SignResponse, error) { + return nil, fmt.Errorf("not yet implemented") } func runMockConsulServer(t *testing.T, opts mockConsulOptions) *mockConsulServer { @@ -406,6 +474,7 @@ func runMockConsulServer(t *testing.T, opts mockConsulOptions) *mockConsulServer rootCertPEM: string(ca.CertBytes), clientCert: string(clientCert.CertBytes), clientPrivateKey: string(clientCert.PrivateKeyBytes), + opts: opts, } loginPath := "/v1/acl/login" @@ -413,7 +482,6 @@ func runMockConsulServer(t *testing.T, opts mockConsulOptions) *mockConsulServer registerPath := "/v1/agent/service/register" deregisterPath := "/v1/agent/service/deregister" leafPath := "/v1/agent/connect/ca/leaf" - rootPath := "/v1/agent/connect/ca/roots" // Start the fake Consul server. consulServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -464,33 +532,50 @@ func runMockConsulServer(t *testing.T, opts mockConsulOptions) *mockConsulServer require.NoError(t, err) return } - if r != nil && r.URL.Path == rootPath && r.Method == "GET" { - if opts.rootCertFail { - w.WriteHeader(http.StatusInternalServerError) - return - } - rootCert, err := json.Marshal(map[string]interface{}{ - "Roots": []map[string]interface{}{{ - "RootCert": server.rootCertPEM, - "Active": true, - }}, - }) - require.NoError(t, err) - _, err = w.Write(rootCert) - require.NoError(t, err) - return - } w.WriteHeader(http.StatusInternalServerError) })) t.Cleanup(consulServer.Close) serverURL, err := url.Parse(consulServer.URL) require.NoError(t, err) + consulHTTPAddress := serverURL.Host + clientConfig := &api.Config{Address: serverURL.String()} client, err := api.NewClient(clientConfig) require.NoError(t, err) - server.client = client server.config = clientConfig + + // httptest.NewServer hardcodes 127.0.0.1, so this will be the same as for + // the gRPC server, just on a different port + consulHTTPAddressParts := strings.Split(consulHTTPAddress, ":") + server.address = consulHTTPAddressParts[0] + server.httpPort, err = strconv.Atoi(consulHTTPAddressParts[1]) + require.NoError(t, err) + fmt.Printf("running Consul HTTP mock server at %s\n", serverURL.String()) + + // Start the fake Consul gRPC server + grpcServer := grpc.NewServer() + pbconnectca.RegisterConnectCAServiceServer(grpcServer, server) + grpcListener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + errEarlyTestTermination := errors.New("early termination") + done := make(chan error, 1) + go func() { + defer func() { + // Write an error to the channel, if the server canceled + // successfully the err will be nil and the read will get that + // first, this will only be read if we have some early exception + // that calls runtime.Goexit prior to the server stopping + done <- errEarlyTestTermination + }() + // Start gRPC mock server, send nil error if clean exit + done <- grpcServer.Serve(grpcListener) + }() + server.grpcPort, err = strconv.Atoi(strings.Split(grpcListener.Addr().String(), ":")[1]) + require.NoError(t, err) + fmt.Printf("running Consul gRPC mock server at %s\n", grpcListener.Addr().String()) + return server } diff --git a/internal/consul/certmanager_test.go b/internal/consul/certmanager_test.go index 73e4754de..1f6642130 100644 --- a/internal/consul/certmanager_test.go +++ b/internal/consul/certmanager_test.go @@ -4,13 +4,17 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" + "net" "net/http" "net/http/httptest" "net/url" "os" "path" + "strconv" "strings" + "sync" "sync/atomic" "testing" "time" @@ -18,8 +22,13 @@ import ( "github.com/stretchr/testify/require" gwTesting "github.com/hashicorp/consul-api-gateway/internal/testing" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto-public/pbconnectca" + "github.com/hashicorp/go-hclog" + + "google.golang.org/grpc" ) func TestManage(t *testing.T) { @@ -29,6 +38,7 @@ func TestManage(t *testing.T) { name string leafFailures uint64 rootFailures uint64 + expirations uint64 }{{ name: "test-basic", }, { @@ -44,19 +54,19 @@ func TestManage(t *testing.T) { defer os.RemoveAll(directory) service := gwTesting.RandomString() - server := runCertServer(t, test.leafFailures, test.rootFailures, service, 0) + server := runCertServer(t, service, test.leafFailures, test.rootFailures, test.expirations) options := DefaultCertManagerOptions() options.Directory = directory manager := NewCertManager( - hclog.NewNullLogger(), + hclog.Default().Named("certmanager"), Config{ Addresses: []string{server.consulAddress}, - GRPCPort: 8502, + GRPCPort: server.consulGRPCPort, TLS: nil, }, - NewClient(server.consulClient), + NewClient(server.consulHTTPClient), service, options, ) @@ -115,17 +125,17 @@ func TestManage_Refresh(t *testing.T) { service := gwTesting.RandomString() - server := runCertServer(t, 0, 0, service, 2) + server := runCertServer(t, service, 0, 0, 2) options := DefaultCertManagerOptions() manager := NewCertManager( - hclog.NewNullLogger(), + hclog.Default().Named("certmanager"), Config{ Addresses: []string{server.consulAddress}, - GRPCPort: 8502, + GRPCPort: server.consulGRPCPort, TLS: nil, }, - NewClient(server.consulClient), + NewClient(server.consulHTTPClient), service, options, ) @@ -177,7 +187,7 @@ func TestManage_WaitCancel(t *testing.T) { defer cancel() err := NewCertManager( - hclog.NewNullLogger(), + hclog.Default().Named("certmanager"), Config{}, nil, "", @@ -186,16 +196,82 @@ func TestManage_WaitCancel(t *testing.T) { require.Error(t, err) } -type certServer struct { - consulAddress string - consulClient *api.Client +type testCAHandler struct { + consulAddress string + consulHTTPPort int + consulGRPCPort int + consulHTTPClient *api.Client + consulGRPCClient *pbconnectca.ConnectCAServiceClient + + ca *gwTesting.CertificateInfo fakeRootCertPEM string fakeClientCert string fakeClientPrivateKey string + + rotate chan struct{} + + mutex sync.RWMutex +} + +func (c *testCAHandler) WatchRoots(request *pbconnectca.WatchRootsRequest, stream pbconnectca.ConnectCAService_WatchRootsServer) error { + writeCertificate := func() error { + fmt.Printf("writing certificate to channel") + + c.mutex.RLock() + ca := c.fakeRootCertPEM + c.mutex.RUnlock() + + if err := stream.Send(&pbconnectca.WatchRootsResponse{ + ActiveRootId: "test", + Roots: []*pbconnectca.CARoot{{ + Id: "test", + RootCert: ca, + }}, + }); err != nil { + return err + } + return nil + } + + // do initial write + if err := writeCertificate(); err != nil { + return err + } + + for { + select { + case <-stream.Context().Done(): + return nil + case <-c.rotate: + if err := writeCertificate(); err != nil { + return err + } + } + } +} + +func (c *testCAHandler) Rotate() { + rootCA, err := gwTesting.GenerateSignedCertificate(gwTesting.GenerateCertificateOptions{ + IsCA: true, + }) + if err != nil { + panic(err) + } + + c.mutex.Lock() + c.ca = rootCA + c.fakeRootCertPEM = string(rootCA.CertBytes) + c.mutex.Unlock() + + c.rotate <- struct{}{} } -func runCertServer(t *testing.T, leafFailures, rootFailures uint64, service string, expirations int32) *certServer { +func (c *testCAHandler) Sign(ctx context.Context, request *pbconnectca.SignRequest) (*pbconnectca.SignResponse, error) { + return nil, fmt.Errorf("not yet implemented") +} + +func runCertServer(t *testing.T, service string, leafFailures, rootFailures, expirations uint64) *testCAHandler { t.Helper() ca, _, clientCert := gwTesting.DefaultCertificates() @@ -206,16 +282,17 @@ func runCertServer(t *testing.T, leafFailures, rootFailures uint64, service stri }) require.NoError(t, err) - server := &certServer{ + server := &testCAHandler{ fakeRootCertPEM: string(ca.CertBytes), fakeClientCert: string(clientCert.CertBytes), fakeClientPrivateKey: string(clientCert.PrivateKeyBytes), + rotate: make(chan struct{}), } - // Start the fake Consul server. - consulServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Start the fake Consul HTTP server. + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { leafPath := fmt.Sprintf("/v1/agent/connect/ca/leaf/%s", service) - rootPath := "/v1/agent/connect/ca/roots" + // rootPath := "/v1/agent/connect/ca/roots" meta := map[string]string{ "X-Consul-Index": "1", } @@ -251,39 +328,70 @@ func runCertServer(t *testing.T, leafFailures, rootFailures uint64, service stri require.NoError(t, err) return } - if r != nil && strings.HasPrefix(r.URL.Path, rootPath) && r.Method == "GET" { - if rootFailures > 0 { - rootFailures-- - w.WriteHeader(http.StatusInternalServerError) - return - } - rootCert, err := json.Marshal(map[string]interface{}{ - "Roots": []map[string]interface{}{{ - "RootCert": server.fakeRootCertPEM, - "Active": true, - }}, - }) - require.NoError(t, err) - for k, v := range meta { - w.Header().Add(k, v) - } - _, err = w.Write(rootCert) - require.NoError(t, err) - return - } + // if r != nil && strings.HasPrefix(r.URL.Path, rootPath) && r.Method == "GET" { + // if rootFailures > 0 { + // rootFailures-- + // w.WriteHeader(http.StatusInternalServerError) + // return + // } + // rootCert, err := json.Marshal(map[string]interface{}{ + // "Roots": []map[string]interface{}{{ + // "RootCert": server.fakeRootCertPEM, + // "Active": true, + // }}, + // }) + // require.NoError(t, err) + // for k, v := range meta { + // w.Header().Add(k, v) + // } + // _, err = w.Write(rootCert) + // require.NoError(t, err) + // return + // } w.WriteHeader(http.StatusInternalServerError) })) - t.Cleanup(consulServer.Close) + t.Cleanup(httpServer.Close) - serverURL, err := url.Parse(consulServer.URL) + serverURL, err := url.Parse(httpServer.URL) require.NoError(t, err) - consulHTTPAddress := serverURL.String() + consulHTTPAddress := serverURL.Host + clientConfig := &api.Config{Address: consulHTTPAddress} client, err := api.NewClient(clientConfig) require.NoError(t, err) + server.consulHTTPClient = client + + // httptest.NewServer hardcodes 127.0.0.1, so this will be the same as for + // the gRPC server, just on a different port + consulHTTPAddressParts := strings.Split(consulHTTPAddress, ":") + server.consulAddress = consulHTTPAddressParts[0] + server.consulHTTPPort, err = strconv.Atoi(consulHTTPAddressParts[1]) + require.NoError(t, err) + fmt.Printf("running Consul HTTP mock server at %s\n", consulHTTPAddress) + + // Start the fake Consul gRPC server + grpcServer := grpc.NewServer() + pbconnectca.RegisterConnectCAServiceServer(grpcServer, server) + grpcListener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + errEarlyTestTermination := errors.New("early termination") + done := make(chan error, 1) + go func() { + defer func() { + // Write an error to the channel, if the server canceled + // successfully the err will be nil and the read will get that + // first, this will only be read if we have some early exception + // that calls runtime.Goexit prior to the server stopping + done <- errEarlyTestTermination + }() + // Start gRPC mock server, send nil error if clean exit + done <- grpcServer.Serve(grpcListener) + }() + server.consulGRPCPort, err = strconv.Atoi(strings.Split(grpcListener.Addr().String(), ":")[1]) + require.NoError(t, err) + fmt.Printf("running Consul gRPC mock server at %s\n", grpcListener.Addr().String()) - server.consulAddress = strings.Split(consulHTTPAddress, ":")[0] - server.consulClient = client return server } @@ -346,7 +454,7 @@ func TestRenderSDS(t *testing.T) { options := DefaultCertManagerOptions() options.Directory = "/certs" manager := NewCertManager( - hclog.NewNullLogger(), + hclog.Default().Named("certmanager"), Config{}, nil, gwTesting.RandomString(), From 62cf4928d87bede615a700618b9e5f7cf99517b8 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Mon, 14 Nov 2022 22:38:56 -0500 Subject: [PATCH 3/5] refactor CertManagerOptions to check GRPCUseTLS set GRPCUseTLS: false in unit tests set GRPCUseTLS from consulCfg.TLSConfig.CAFile in server and exec --- internal/commands/exec/exec.go | 20 ++++++++----- internal/commands/server/server.go | 22 +++++++++----- internal/consul/certmanager.go | 45 +++++++++++++++++++---------- internal/consul/certmanager_test.go | 20 +++++-------- internal/testing/e2e/consul.go | 11 +++++++ internal/testing/e2e/gateway.go | 9 +++--- 6 files changed, 79 insertions(+), 48 deletions(-) diff --git a/internal/commands/exec/exec.go b/internal/commands/exec/exec.go index 43646d6aa..f61ad6ee7 100644 --- a/internal/commands/exec/exec.go +++ b/internal/commands/exec/exec.go @@ -152,6 +152,8 @@ func RunExec(config ExecConfig) (ret int) { }, ) options := consul.DefaultCertManagerOptions() + options.Addresses = []string{config.EnvoyConfig.XDSAddress} + options.GRPCPort = config.EnvoyConfig.XDSPort options.PrimaryDatacenter = config.PrimaryDatacenter options.SDSAddress = config.EnvoyConfig.SDSAddress options.SDSPort = config.EnvoyConfig.SDSPort @@ -160,18 +162,20 @@ func RunExec(config ExecConfig) (ret int) { options.Directory = config.EnvoyConfig.CertificateDirectory } - tlsConfig, err := api.SetupTLSConfig(&config.ConsulConfig.TLSConfig) - if err != nil { - return 1 + // This is the same check used in exec/command.go for HTTPS API configuration + if config.ConsulConfig.TLSConfig.CAFile != "" { + config.Logger.Info("configuring gateway TLS") + tlsConfig, err := api.SetupTLSConfig(&config.ConsulConfig.TLSConfig) + if err != nil { + return 1 + } + + options.GRPCUseTLS = true + options.GRPCTLS = tlsConfig } certManager := consul.NewCertManager( config.Logger.Named("cert-manager"), - consul.Config{ - Addresses: []string{config.EnvoyConfig.XDSAddress}, - GRPCPort: config.EnvoyConfig.XDSPort, - TLS: tlsConfig, - }, client, config.GatewayConfig.Name, options, diff --git a/internal/commands/server/server.go b/internal/commands/server/server.go index 4f4bb713c..38104d191 100644 --- a/internal/commands/server/server.go +++ b/internal/commands/server/server.go @@ -75,20 +75,26 @@ func RunServer(config ServerConfig) int { controller.SetStore(store) options := consul.DefaultCertManagerOptions() + options.Addresses = []string{strings.Split(config.ConsulConfig.Address, ":")[0]} + options.GRPCPort = config.ConsulGRPCPort options.PrimaryDatacenter = config.PrimaryDatacenter - tlsConfig, err := api.SetupTLSConfig(&config.ConsulConfig.TLSConfig) - if err != nil { - return 1 + // This is the same check used in server/command.go for HTTPS API configuration + if config.ConsulConfig.TLSConfig.CAFile != "" { + config.Logger.Info("configuring controller TLS") + tlsConfig, err := api.SetupTLSConfig(&config.ConsulConfig.TLSConfig) + if err != nil { + return 1 + } + + options.GRPCUseTLS = true + options.GRPCTLS = tlsConfig } + config.Logger.Info(fmt.Sprintf("%+v", options)) + certManager := consul.NewCertManager( config.Logger.Named("cert-manager"), - consul.Config{ - Addresses: []string{strings.Split(config.ConsulConfig.Address, ":")[0]}, - GRPCPort: config.ConsulGRPCPort, - TLS: tlsConfig, - }, client, "consul-api-gateway-controller", options, diff --git a/internal/consul/certmanager.go b/internal/consul/certmanager.go index 6c23e595e..18b9211ac 100644 --- a/internal/consul/certmanager.go +++ b/internal/consul/certmanager.go @@ -43,15 +43,6 @@ var ( sdsCAConfigTemplate = template.New("sdsCA") ) -type Config struct { - Addresses []string - GRPCPort int - Namespace string - Partition string - Datacenter string - TLS *tls.Config -} - type sdsClusterArgs struct { Name string CertSDSConfigPath string @@ -87,7 +78,14 @@ func init() { // CertManagerOptions contains the optional configuration used to initialize a CertManager. type CertManagerOptions struct { + Addresses []string + GRPCPort int + GRPCTLS *tls.Config + GRPCUseTLS bool Directory string + Namespace string + Partition string + Datacenter string PrimaryDatacenter string SDSAddress string SDSPort int @@ -96,6 +94,9 @@ type CertManagerOptions struct { // DefaultCertManagerOptions returns the default options for a CertManager instance. func DefaultCertManagerOptions() *CertManagerOptions { return &CertManagerOptions{ + GRPCPort: 8502, + Namespace: "default", + Partition: "default", SDSAddress: defaultSDSAddress, SDSPort: defaultSDSPort, } @@ -109,14 +110,20 @@ type certWriter func() error // Once a leaf certificate has expired, it generates a new certificate and writes // it to the location given in the configuration options with which it was created. type CertManager struct { - cfg Config apiClient Client grpcClient pbconnectca.ConnectCAServiceClient logger hclog.Logger + addresses []string + grpcPort int + grpcTLS *tls.Config + grpcUseTLS bool service string directory string configDirectory string // only used for testing + namespace string + partition string + datacenter string primaryDatacenter string sdsAddress string sdsPort int @@ -144,14 +151,20 @@ type CertManager struct { } // NewCertManager creates a new CertManager instance. -func NewCertManager(logger hclog.Logger, cfg Config, apiClient Client, service string, options *CertManagerOptions) *CertManager { +func NewCertManager(logger hclog.Logger, apiClient Client, service string, options *CertManagerOptions) *CertManager { if options == nil { options = DefaultCertManagerOptions() } manager := &CertManager{ - cfg: cfg, + addresses: options.Addresses, + grpcPort: options.GRPCPort, + grpcTLS: options.GRPCTLS, + grpcUseTLS: options.GRPCUseTLS, apiClient: apiClient, logger: logger, + namespace: options.Namespace, + partition: options.Partition, + datacenter: options.Datacenter, primaryDatacenter: options.PrimaryDatacenter, sdsAddress: options.SDSAddress, sdsPort: options.SDSPort, @@ -270,14 +283,16 @@ func (c *CertManager) handleLeafWatch(blockParam watch.BlockingParamVal, raw int func (c *CertManager) Manage(ctx context.Context) error { c.logger.Trace("running cert manager") - grpcAddress := fmt.Sprintf("%s:%d", c.cfg.Addresses[0], c.cfg.GRPCPort) + grpcAddress := fmt.Sprintf("%s:%d", c.addresses[0], c.grpcPort) c.logger.Trace("dialing " + grpcAddress) + c.logger.Trace("tls", c.grpcUseTLS) // Default to insecure credentials unless TLS config has been provided tlsCredentials := insecure.NewCredentials() - if c.cfg.TLS != nil { - tlsCredentials = credentials.NewTLS(c.cfg.TLS) + if c.grpcUseTLS { + c.logger.Trace(fmt.Sprintf("configuring gRPC TLS credentials: %+v", c.grpcTLS)) + tlsCredentials = credentials.NewTLS(c.grpcTLS) } conn, err := grpc.DialContext(ctx, grpcAddress, grpc.WithTransportCredentials(tlsCredentials)) diff --git a/internal/consul/certmanager_test.go b/internal/consul/certmanager_test.go index 1f6642130..c2be26bee 100644 --- a/internal/consul/certmanager_test.go +++ b/internal/consul/certmanager_test.go @@ -57,15 +57,13 @@ func TestManage(t *testing.T) { server := runCertServer(t, service, test.leafFailures, test.rootFailures, test.expirations) options := DefaultCertManagerOptions() + options.Addresses = []string{server.consulAddress} + options.GRPCPort = server.consulGRPCPort + options.GRPCUseTLS = false options.Directory = directory manager := NewCertManager( hclog.Default().Named("certmanager"), - Config{ - Addresses: []string{server.consulAddress}, - GRPCPort: server.consulGRPCPort, - TLS: nil, - }, NewClient(server.consulHTTPClient), service, options, @@ -128,13 +126,12 @@ func TestManage_Refresh(t *testing.T) { server := runCertServer(t, service, 0, 0, 2) options := DefaultCertManagerOptions() + options.Addresses = []string{server.consulAddress} + options.GRPCPort = server.consulGRPCPort + options.GRPCUseTLS = false + manager := NewCertManager( hclog.Default().Named("certmanager"), - Config{ - Addresses: []string{server.consulAddress}, - GRPCPort: server.consulGRPCPort, - TLS: nil, - }, NewClient(server.consulHTTPClient), service, options, @@ -188,7 +185,6 @@ func TestManage_WaitCancel(t *testing.T) { err := NewCertManager( hclog.Default().Named("certmanager"), - Config{}, nil, "", nil, @@ -453,9 +449,9 @@ func TestRenderSDS(t *testing.T) { options := DefaultCertManagerOptions() options.Directory = "/certs" + manager := NewCertManager( hclog.Default().Named("certmanager"), - Config{}, nil, gwTesting.RandomString(), options, diff --git a/internal/testing/e2e/consul.go b/internal/testing/e2e/consul.go index d172f767c..5f623b9dd 100644 --- a/internal/testing/e2e/consul.go +++ b/internal/testing/e2e/consul.go @@ -89,6 +89,7 @@ func init() { type consulTestEnvironment struct { ca []byte + consulGRPCUseTLS bool consulTLSConfig *tls.Config consulClient *api.Client token string @@ -182,6 +183,11 @@ func CreateTestConsulContainer(name, namespace string) env.Func { KeyPEM: clientCert.PrivateKeyBytes, } + var consulGRPCUseTLS bool + if consulAPITLSConfig.CAFile != "" || len(consulAPITLSConfig.CAPem) > 0 { + consulGRPCUseTLS = true + } + consulTLSConfig, err := api.SetupTLSConfig(&consulAPITLSConfig) if err != nil { return nil, err @@ -219,6 +225,7 @@ func CreateTestConsulContainer(name, namespace string) env.Func { env := &consulTestEnvironment{ ca: rootCA.CertBytes, + consulGRPCUseTLS: consulGRPCUseTLS, consulTLSConfig: consulTLSConfig, consulClient: consulClient, httpPort: httpsPort, @@ -458,6 +465,10 @@ func consulDeployment(namespace string, httpsPort, grpcPort int) *apps.Deploymen } } +func ConsulGRPCUseTLS(ctx context.Context) bool { + return mustGetTestEnvironment(ctx).consulGRPCUseTLS +} + func ConsulTLSConfig(ctx context.Context) *tls.Config { return mustGetTestEnvironment(ctx).consulTLSConfig } diff --git a/internal/testing/e2e/gateway.go b/internal/testing/e2e/gateway.go index 7b86680ce..fef0fbef1 100644 --- a/internal/testing/e2e/gateway.go +++ b/internal/testing/e2e/gateway.go @@ -81,14 +81,13 @@ func (p *gatewayTestEnvironment) run(ctx context.Context, namespace string, cfg // set up the cert manager certManagerOptions := consul.DefaultCertManagerOptions() + certManagerOptions.Addresses = []string{"localhost"} + certManagerOptions.GRPCPort = ConsulGRPCPort(ctx) + certManagerOptions.GRPCUseTLS = ConsulGRPCUseTLS(ctx) + certManagerOptions.GRPCTLS = ConsulTLSConfig(ctx) certManagerOptions.Directory = p.directory certManager := consul.NewCertManager( nullLogger, - consul.Config{ - Addresses: []string{"localhost"}, - GRPCPort: ConsulGRPCPort(ctx), - TLS: ConsulTLSConfig(ctx), - }, client, "consul-api-gateway", certManagerOptions, From 467d4d4761d778589c9f61fd442645d5838b0927 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Fri, 11 Nov 2022 23:37:44 -0500 Subject: [PATCH 4/5] add changelog entry --- .changelog/443.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/443.txt diff --git a/.changelog/443.txt b/.changelog/443.txt new file mode 100644 index 000000000..c97536e16 --- /dev/null +++ b/.changelog/443.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +consul: swap CA root watch from Consul Agent API to gRPC +``` From 263e57f77424fc07500d6aff22440790ebeb0b44 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Mon, 14 Nov 2022 23:36:25 -0500 Subject: [PATCH 5/5] remove Consul 1.11 and 1.12 from CI, add Consul 1.14 to unit tests --- .github/workflows/ci.yml | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c47fbc15d..b30798fd7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,12 +36,10 @@ jobs: strategy: matrix: consul-version: - - 1.11.11 - - 1.11.11+ent - - 1.12.6 - - 1.12.6+ent - 1.13.3 - 1.13.3+ent + - 1.14.0-beta1 + - 1.14.0-beta1+ent runs-on: ubuntu-latest env: TEST_RESULTS_DIR: /tmp/test-results/consul@${{ matrix.consul-version }} @@ -91,10 +89,6 @@ jobs: fail-fast: false matrix: consul-image: - - 'hashicorp/consul:1.11.11' - - 'hashicorp/consul-enterprise:1.11.11-ent' - - 'hashicorp/consul:1.12.6' - - 'hashicorp/consul-enterprise:1.12.6-ent' - 'hashicorp/consul:1.13.3' - 'hashicorp/consul-enterprise:1.13.3-ent' - 'hashicorppreview/consul:1.14-dev'