diff --git a/Makefile b/Makefile index 9aa0bd999df..f205c0e1c17 100644 --- a/Makefile +++ b/Makefile @@ -205,7 +205,7 @@ lint-flags: .PHONY: generate generate: ## Re-generate generated code and documentation -generate: generate-rbac generate-crd-deepcopy generate-crd-yaml generate-gateway-crd-yaml generate-deployment generate-api-docs generate-metrics-docs generate-uml +generate: generate-rbac generate-crd-deepcopy generate-crd-yaml generate-gateway-crd-yaml generate-deployment generate-api-docs generate-metrics-docs generate-uml generate-go .PHONY: generate-rbac generate-rbac: @@ -240,9 +240,14 @@ generate-api-docs: .PHONY: generate-metrics-docs generate-metrics-docs: - @echo Generating metrics documentation ... + @echo "Generating metrics documentation..." @cd site/content/guides/metrics && rm -f *.md && go run ../../../../hack/generate-metrics-doc.go +.PHONY: generate-go +generate-go: + @echo "Generating mocks..." + @go generate ./... + .PHONY: check-generate check-generate: generate @./hack/actions/check-uncommitted-codegen.sh diff --git a/changelogs/unreleased/4202-sunjayBhatia-minor.md b/changelogs/unreleased/4202-sunjayBhatia-minor.md new file mode 100644 index 00000000000..f5afef0a1cc --- /dev/null +++ b/changelogs/unreleased/4202-sunjayBhatia-minor.md @@ -0,0 +1,8 @@ +### Transition to controller-runtime managed leader election + +Contour now utilizes [controller-runtime](https://github.com/kubernetes-sigs/controller-runtime) Manager based leader election and coordination of subroutines. +With this change, Contour is also transitioning away from using a ConfigMap for leader election. +In this release, Contour now uses a combination of ConfigMap and Lease object. +A future release will remove usage of the ConfigMap resource for leader election. + +This change should be a no-op for most users, however be sure to re-apply the relevant parts of your deployment for RBAC to ensure Contour has access to Lease and Event objects (this would be the ClusterRole in the provided example YAML). diff --git a/cmd/contour/ingressstatus.go b/cmd/contour/ingressstatus.go index 385b16f1bfc..1b75406792e 100644 --- a/cmd/contour/ingressstatus.go +++ b/cmd/contour/ingressstatus.go @@ -48,23 +48,16 @@ import ( type loadBalancerStatusWriter struct { log logrus.FieldLogger cache cache.Cache - isLeader chan struct{} lbStatus chan v1.LoadBalancerStatus statusUpdater k8s.StatusUpdater ingressClassName string } -func (isw *loadBalancerStatusWriter) Start(stop <-chan struct{}) error { - // Await leadership election. - isw.log.Info("awaiting leadership election") - select { - case <-stop: - // We were asked to stop before elected leader. - return nil - case <-isw.isLeader: - isw.log.Info("elected leader") - } +func (isw *loadBalancerStatusWriter) NeedLeaderElection() bool { + return true +} +func (isw *loadBalancerStatusWriter) Start(ctx context.Context) error { u := &k8s.StatusAddressUpdater{ Logger: func() logrus.FieldLogger { // Configure the StatusAddressUpdater logger. @@ -98,7 +91,7 @@ func (isw *loadBalancerStatusWriter) Start(stop <-chan struct{}) error { for { select { - case <-stop: + case <-ctx.Done(): // Once started, there's no way to stop the // informer from here. Clear the load balancer // status so that subsequent informer events diff --git a/cmd/contour/ingressstatus_test.go b/cmd/contour/ingressstatus_test.go index 80d52c9c6d3..0994431d0f1 100644 --- a/cmd/contour/ingressstatus_test.go +++ b/cmd/contour/ingressstatus_test.go @@ -18,7 +18,9 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/manager" ) func Test_parseStatusFlag(t *testing.T) { @@ -194,3 +196,8 @@ func Test_lbAddress(t *testing.T) { }) } } + +func TestRequireLeaderElection(t *testing.T) { + var l manager.LeaderElectionRunnable = &loadBalancerStatusWriter{} + require.True(t, l.NeedLeaderElection()) +} diff --git a/cmd/contour/leadership.go b/cmd/contour/leadership.go deleted file mode 100644 index 6c67ce50f79..00000000000 --- a/cmd/contour/leadership.go +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright Project Contour Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "context" - "os" - - "github.com/google/uuid" - "github.com/projectcontour/contour/internal/workgroup" - "github.com/projectcontour/contour/pkg/config" - "github.com/sirupsen/logrus" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" -) - -func disableLeaderElection(log logrus.FieldLogger) chan struct{} { - log.Info("Leader election disabled") - - leader := make(chan struct{}) - close(leader) - return leader -} - -// setupLeadershipElection registers leadership workers with the group and returns -// a channel which will become ready when this process becomes the leader, or, in the -// event that leadership election is disabled, the channel will be ready immediately. -func setupLeadershipElection( - g *workgroup.Group, - log logrus.FieldLogger, - conf config.LeaderElectionParameters, - client *kubernetes.Clientset, updateNow func(), -) chan struct{} { - le, leader, deposed := newLeaderElector(log, conf, client) - - g.AddContext(func(electionCtx context.Context) error { - log.WithFields(logrus.Fields{ - "configmapname": conf.Name, - "configmapnamespace": conf.Namespace, - }).Info("started leader election") - - le.Run(electionCtx) - log.Info("stopped leader election") - return nil - }) - - g.Add(func(stop <-chan struct{}) error { - log := log.WithField("context", "leaderelection") - for { - select { - case <-stop: - // shut down - log.Info("stopped leader election") - return nil - case <-leader: - log.Info("elected as leader, triggering rebuild") - updateNow() - - // disable this case - leader = nil - case <-deposed: - // If we get deposed as leader, shut it down. - log.Info("deposed as leader, shutting down") - return nil - } - } - }) - - return leader -} - -// newLeaderElector creates a new leaderelection.LeaderElector and associated -// channels by which to observe elections and depositions. -func newLeaderElector( - log logrus.FieldLogger, - conf config.LeaderElectionParameters, - client *kubernetes.Clientset, -) (*leaderelection.LeaderElector, chan struct{}, chan struct{}) { - log = log.WithField("context", "leaderelection") - // leaderOK will block gRPC startup until it's closed. - leaderOK := make(chan struct{}) - // deposed is closed by the leader election callback when - // we are deposed as leader so that we can clean up. - deposed := make(chan struct{}) - - rl := newResourceLock(log, conf, client) - - // Make the leader elector, ready to be used in the Workgroup. - le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: conf.LeaseDuration, - RenewDeadline: conf.RenewDeadline, - RetryPeriod: conf.RetryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(_ context.Context) { - log.WithFields(logrus.Fields{ - "lock": rl.Describe(), - "identity": rl.Identity(), - }).Info("elected leader") - close(leaderOK) - }, - OnStoppedLeading: func() { - // The context being canceled will trigger a handler that will - // deal with being deposed. - close(deposed) - }, - }, - }) - if err != nil { - log.WithError(err).Fatal("failed to create leader elector") - } - return le, leaderOK, deposed -} - -// newResourceLock creates a new resourcelock.Interface based on the Pod's name, -// or a uuid if the name cannot be determined. -func newResourceLock(log logrus.FieldLogger, conf config.LeaderElectionParameters, client *kubernetes.Clientset) resourcelock.Interface { - resourceLockID, found := os.LookupEnv("POD_NAME") - if !found { - resourceLockID = uuid.New().String() - } - - rl, err := resourcelock.New( - // TODO(youngnick) change this to a Lease object instead - // of the configmap once the Lease API has been GA for a full support - // cycle (ie nine months). - // Figure out the resource lock ID - resourcelock.ConfigMapsResourceLock, - conf.Namespace, - conf.Name, - client.CoreV1(), - client.CoordinationV1(), - resourcelock.ResourceLockConfig{ - Identity: resourceLockID, - }, - ) - if err != nil { - log.WithError(err). - WithField("name", conf.Name). - WithField("namespace", conf.Namespace). - WithField("identity", resourceLockID). - Fatal("failed to create new resource lock") - } - return rl -} diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index 10a6514957a..404c14d9896 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -20,10 +20,8 @@ import ( "net" "net/http" "os" - "os/signal" "regexp" "strconv" - "syscall" "time" envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" @@ -39,9 +37,9 @@ import ( "github.com/projectcontour/contour/internal/health" "github.com/projectcontour/contour/internal/httpsvc" "github.com/projectcontour/contour/internal/k8s" + "github.com/projectcontour/contour/internal/leadership" "github.com/projectcontour/contour/internal/metrics" "github.com/projectcontour/contour/internal/timeout" - "github.com/projectcontour/contour/internal/workgroup" "github.com/projectcontour/contour/internal/xds" contour_xds_v3 "github.com/projectcontour/contour/internal/xds/v3" "github.com/projectcontour/contour/internal/xdscache" @@ -169,7 +167,6 @@ func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext) } type Server struct { - group workgroup.Group log logrus.FieldLogger ctx *serveContext coreClient *kubernetes.Clientset @@ -181,9 +178,6 @@ type Server struct { // objects required to start an instance of Contour. func NewServer(log logrus.FieldLogger, ctx *serveContext) (*Server, error) { - // Set up workgroup runner. - var group workgroup.Group - // Establish k8s core client connection. restConfig, err := k8s.NewRestConfig(ctx.Config.Kubeconfig, ctx.Config.InCluster) if err != nil { @@ -201,9 +195,25 @@ func NewServer(log logrus.FieldLogger, ctx *serveContext) (*Server, error) { } // Instantiate a controller-runtime manager. - mgr, err := manager.New(restConfig, manager.Options{ + options := manager.Options{ Scheme: scheme, - }) + } + if ctx.DisableLeaderElection { + log.Info("Leader election disabled") + options.LeaderElection = false + } else { + options.LeaderElection = true + // This represents a multilock on configmaps and leases. + // TODO: switch to solely "leases" once a release cycle has passed. + options.LeaderElectionResourceLock = "configmapsleases" + options.LeaderElectionNamespace = ctx.Config.LeaderElection.Namespace + options.LeaderElectionID = ctx.Config.LeaderElection.Name + options.LeaseDuration = &ctx.Config.LeaderElection.LeaseDuration + options.RenewDeadline = &ctx.Config.LeaderElection.RenewDeadline + options.RetryPeriod = &ctx.Config.LeaderElection.RetryPeriod + options.LeaderElectionReleaseOnCancel = true + } + mgr, err := manager.New(restConfig, options) if err != nil { return nil, fmt.Errorf("unable to set up controller manager: %w", err) } @@ -214,7 +224,6 @@ func NewServer(log logrus.FieldLogger, ctx *serveContext) (*Server, error) { registry.MustRegister(collectors.NewGoCollector()) return &Server{ - group: group, log: log, ctx: ctx, coreClient: coreClient, @@ -260,11 +269,6 @@ func (s *Server) doServe() error { return err } - // Register the manager with the workgroup. - s.group.AddContext(func(taskCtx context.Context) error { - return s.mgr.Start(signals.SetupSignalHandler()) - }) - // informerNamespaces is a list of namespaces that we should start informers for. var informerNamespaces []string @@ -380,24 +384,36 @@ func (s *Server) doServe() error { fallbackCert = &types.NamespacedName{Name: contourConfiguration.HTTPProxy.FallbackCertificate.Name, Namespace: contourConfiguration.HTTPProxy.FallbackCertificate.Namespace} } + sh := k8s.NewStatusUpdateHandler(s.log.WithField("context", "StatusUpdateHandler"), s.mgr.GetClient()) + if err := s.mgr.Add(sh); err != nil { + return err + } + + builder := s.getDAGBuilder(dagBuilderConfig{ + ingressClassName: ingressClassName, + rootNamespaces: contourConfiguration.HTTPProxy.RootNamespaces, + gatewayAPIConfigured: contourConfiguration.Gateway != nil, + disablePermitInsecure: contourConfiguration.HTTPProxy.DisablePermitInsecure, + enableExternalNameService: contourConfiguration.EnableExternalNameService, + dnsLookupFamily: contourConfiguration.Envoy.Cluster.DNSLookupFamily, + headersPolicy: contourConfiguration.Policy, + clientCert: clientCert, + fallbackCert: fallbackCert, + }) + // Build the core Kubernetes event handler. - contourHandler := &contour.EventHandler{ + observer := contour.NewRebuildMetricsObserver( + contourMetrics, + dag.ComposeObservers(append(xdscache.ObserversOf(resources), snapshotHandler)...), + ) + contourHandler := contour.NewEventHandler(contour.EventHandlerConfig{ + Logger: s.log.WithField("context", "contourEventHandler"), HoldoffDelay: 100 * time.Millisecond, HoldoffMaxDelay: 500 * time.Millisecond, - Observer: dag.ComposeObservers(append(xdscache.ObserversOf(resources), snapshotHandler)...), - Builder: s.getDAGBuilder(dagBuilderConfig{ - ingressClassName: ingressClassName, - rootNamespaces: contourConfiguration.HTTPProxy.RootNamespaces, - gatewayAPIConfigured: contourConfiguration.Gateway != nil, - disablePermitInsecure: contourConfiguration.HTTPProxy.DisablePermitInsecure, - enableExternalNameService: contourConfiguration.EnableExternalNameService, - dnsLookupFamily: contourConfiguration.Envoy.Cluster.DNSLookupFamily, - headersPolicy: contourConfiguration.Policy, - clientCert: clientCert, - fallbackCert: fallbackCert, - }), - FieldLogger: s.log.WithField("context", "contourEventHandler"), - } + Observer: observer, + StatusUpdater: sh.Writer(), + Builder: builder, + }) // Wrap contourHandler in an EventRecorder which tracks API server events. eventHandler := &contour.EventRecorder{ @@ -405,21 +421,6 @@ func (s *Server) doServe() error { Counter: contourMetrics.EventHandlerOperations, } - // Register leadership election. - if s.ctx.DisableLeaderElection { - contourHandler.IsLeader = disableLeaderElection(s.log) - } else { - contourHandler.IsLeader = setupLeadershipElection(&s.group, s.log, s.ctx.Config.LeaderElection, s.coreClient, contourHandler.UpdateNow) - } - - // Start setting up StatusUpdateHandler since we need it in - // the Gateway API controllers. Will finish setting it up and - // start it later. - sh := k8s.StatusUpdateHandler{ - Log: s.log.WithField("context", "StatusUpdateHandler"), - Client: s.mgr.GetClient(), - } - // Inform on default resources. for name, r := range map[string]client.Object{ "httpproxies": &contour_api_v1.HTTPProxy{}, @@ -436,7 +437,7 @@ func (s *Server) doServe() error { } // Inform on Gateway API resources. - s.setupGatewayAPI(contourConfiguration, s.mgr, eventHandler, &sh, contourHandler.IsLeader) + needsNotification := s.setupGatewayAPI(contourConfiguration, s.mgr, eventHandler, sh) // Inform on secrets, filtering by root namespaces. var handler cache.ResourceEventHandler = eventHandler @@ -458,45 +459,37 @@ func (s *Server) doServe() error { s.log.WithError(err).WithField("resource", "endpoints").Fatal("failed to create informer") } - // Register our event handler with the workgroup. - s.group.Add(contourHandler.Start()) + // Register our event handler with the manager. + if err := s.mgr.Add(contourHandler); err != nil { + return err + } // Create metrics service. - s.setupMetrics(contourConfiguration.Metrics, contourConfiguration.Health, s.registry) + if err := s.setupMetrics(contourConfiguration.Metrics, contourConfiguration.Health, s.registry); err != nil { + return err + } // Create a separate health service if required. - s.setupHealth(contourConfiguration.Health, contourConfiguration.Metrics) + if err := s.setupHealth(contourConfiguration.Health, contourConfiguration.Metrics); err != nil { + return err + } // Create debug service and register with workgroup. - s.setupDebugService(contourConfiguration.Debug, contourHandler) - - // Once we have the leadership detection channel, we can - // push DAG rebuild metrics onto the observer stack. - contourHandler.Observer = &contour.RebuildMetricsObserver{ - Metrics: contourMetrics, - IsLeader: contourHandler.IsLeader, - NextObserver: contourHandler.Observer, + if err := s.setupDebugService(contourConfiguration.Debug, builder); err != nil { + return err } - // Finish setting up the StatusUpdateHandler and - // add it to the work group. - sh.LeaderElected = contourHandler.IsLeader - s.group.Add(sh.Start) - - // Now we have the statusUpdateHandler, we can create the event handler's StatusUpdater, which will take the - // status updates from the DAG, and send them to the status update handler. - contourHandler.StatusUpdater = sh.Writer() - // Set up ingress load balancer status writer. - lbsw := loadBalancerStatusWriter{ + lbsw := &loadBalancerStatusWriter{ log: s.log.WithField("context", "loadBalancerStatusWriter"), cache: s.mgr.GetCache(), - isLeader: contourHandler.IsLeader, lbStatus: make(chan corev1.LoadBalancerStatus, 1), ingressClassName: ingressClassName, statusUpdater: sh.Writer(), } - s.group.Add(lbsw.Start) + if err := s.mgr.Add(lbsw); err != nil { + return err + } // Register an informer to watch envoy's service if we haven't been given static details. if contourConfiguration.Ingress != nil && contourConfiguration.Ingress.StatusAddress != nil { @@ -523,23 +516,30 @@ func (s *Server) doServe() error { Info("Watching Service for Ingress status") } - s.setupXDSServer(s.mgr, s.registry, contourConfiguration.XDSServer, snapshotHandler, resources) + xdsServer := &xdsServer{ + log: s.log, + mgr: s.mgr, + registry: s.registry, + config: contourConfiguration.XDSServer, + snapshotHandler: snapshotHandler, + resources: resources, + } + if err := s.mgr.Add(xdsServer); err != nil { + return err + } - // Set up SIGTERM handler for graceful shutdown. - s.group.Add(func(stop <-chan struct{}) error { - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) - select { - case sig := <-c: - s.log.WithField("context", "sigterm-handler").WithField("signal", sig).Info("shutting down") - case <-stop: - // Do nothing. The group is shutting down. - } - return nil - }) + notifier := &leadership.Notifier{ + ToNotify: append([]leadership.NeedLeaderElectionNotification{ + contourHandler, + observer, + }, needsNotification...), + } + if err := s.mgr.Add(notifier); err != nil { + return err + } // GO! - return s.group.Run(context.Background()) + return s.mgr.Start(signals.SetupSignalHandler()) } func (s *Server) setupRateLimitService(contourConfiguration contour_api_v1alpha1.ContourConfigurationSpec) (*xdscache_v3.RateLimitConfig, error) { @@ -580,80 +580,89 @@ func (s *Server) setupRateLimitService(contourConfiguration contour_api_v1alpha1 }, nil } -func (s *Server) setupDebugService(debugConfig contour_api_v1alpha1.DebugConfig, contourHandler *contour.EventHandler) { - debugsvc := debug.Service{ +func (s *Server) setupDebugService(debugConfig contour_api_v1alpha1.DebugConfig, builder *dag.Builder) error { + debugsvc := &debug.Service{ Service: httpsvc.Service{ Addr: debugConfig.Address, Port: debugConfig.Port, FieldLogger: s.log.WithField("context", "debugsvc"), }, - Builder: &contourHandler.Builder, + Builder: builder, } - s.group.Add(debugsvc.Start) + return s.mgr.Add(debugsvc) } -func (s *Server) setupXDSServer(mgr manager.Manager, registry *prometheus.Registry, contourConfiguration contour_api_v1alpha1.XDSServerConfig, - snapshotHandler *xdscache.SnapshotHandler, resources []xdscache.ResourceCache) { +type xdsServer struct { + log logrus.FieldLogger + mgr manager.Manager + registry *prometheus.Registry + config contour_api_v1alpha1.XDSServerConfig + snapshotHandler *xdscache.SnapshotHandler + resources []xdscache.ResourceCache +} - s.group.AddContext(func(taskCtx context.Context) error { - log := s.log.WithField("context", "xds") +func (x *xdsServer) NeedLeaderElection() bool { + return false +} - log.Printf("waiting for informer caches to sync") - if !mgr.GetCache().WaitForCacheSync(taskCtx) { - return errors.New("informer cache failed to sync") - } - log.Printf("informer caches synced") - - grpcServer := xds.NewServer(registry, grpcOptions(log, contourConfiguration.TLS)...) - - switch contourConfiguration.Type { - case contour_api_v1alpha1.EnvoyServerType: - v3cache := contour_xds_v3.NewSnapshotCache(false, log) - snapshotHandler.AddSnapshotter(v3cache) - contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(taskCtx, v3cache, contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer) - case contour_api_v1alpha1.ContourServerType: - contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer) - default: - // This can't happen due to config validation. - log.Fatalf("invalid xDS server type %q", contourConfiguration.Type) - } +func (x *xdsServer) Start(ctx context.Context) error { + log := x.log.WithField("context", "xds") - addr := net.JoinHostPort(contourConfiguration.Address, strconv.Itoa(contourConfiguration.Port)) - l, err := net.Listen("tcp", addr) - if err != nil { - return err - } + log.Printf("waiting for informer caches to sync") + if !x.mgr.GetCache().WaitForCacheSync(ctx) { + return errors.New("informer cache failed to sync") + } + log.Printf("informer caches synced") - log = log.WithField("address", addr) - if tls := contourConfiguration.TLS; tls != nil { - if tls.Insecure { - log = log.WithField("insecure", true) - } + grpcServer := xds.NewServer(x.registry, grpcOptions(log, x.config.TLS)...) + + switch x.config.Type { + case contour_api_v1alpha1.EnvoyServerType: + v3cache := contour_xds_v3.NewSnapshotCache(false, log) + x.snapshotHandler.AddSnapshotter(v3cache) + contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(ctx, v3cache, contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer) + case contour_api_v1alpha1.ContourServerType: + contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(x.resources)...), grpcServer) + default: + // This can't happen due to config validation. + log.Fatalf("invalid xDS server type %q", x.config.Type) + } + + addr := net.JoinHostPort(x.config.Address, strconv.Itoa(x.config.Port)) + l, err := net.Listen("tcp", addr) + if err != nil { + return err + } + + log = log.WithField("address", addr) + if tls := x.config.TLS; tls != nil { + if tls.Insecure { + log = log.WithField("insecure", true) } + } - log.Infof("started xDS server type: %q", contourConfiguration.Type) - defer log.Info("stopped xDS server") + log.Infof("started xDS server type: %q", x.config.Type) + defer log.Info("stopped xDS server") - go func() { - <-taskCtx.Done() + go func() { + <-ctx.Done() - // We don't use GracefulStop here because envoy - // has long-lived hanging xDS requests. There's no - // mechanism to make those pending requests fail, - // so we forcibly terminate the TCP sessions. - grpcServer.Stop() - }() + // We don't use GracefulStop here because envoy + // has long-lived hanging xDS requests. There's no + // mechanism to make those pending requests fail, + // so we forcibly terminate the TCP sessions. + grpcServer.Stop() + }() - return grpcServer.Serve(l) - }) + return grpcServer.Serve(l) } // setupMetrics creates metrics service for Contour. func (s *Server) setupMetrics(metricsConfig contour_api_v1alpha1.MetricsConfig, healthConfig contour_api_v1alpha1.HealthConfig, - registry *prometheus.Registry) { + registry *prometheus.Registry) error { // Create metrics service and register with workgroup. - metricsvc := httpsvc.Service{ + metricsvc := &httpsvc.Service{ Addr: metricsConfig.Address, Port: metricsConfig.Port, FieldLogger: s.log.WithField("context", "metricsvc"), @@ -674,14 +683,14 @@ func (s *Server) setupMetrics(metricsConfig contour_api_v1alpha1.MetricsConfig, metricsvc.ServeMux.Handle("/healthz", h) } - s.group.Add(metricsvc.Start) + return s.mgr.Add(metricsvc) } func (s *Server) setupHealth(healthConfig contour_api_v1alpha1.HealthConfig, - metricsConfig contour_api_v1alpha1.MetricsConfig) { + metricsConfig contour_api_v1alpha1.MetricsConfig) error { if healthConfig.Address != metricsConfig.Address || healthConfig.Port != metricsConfig.Port { - healthsvc := httpsvc.Service{ + healthsvc := &httpsvc.Service{ Addr: healthConfig.Address, Port: healthConfig.Port, FieldLogger: s.log.WithField("context", "healthsvc"), @@ -691,47 +700,53 @@ func (s *Server) setupHealth(healthConfig contour_api_v1alpha1.HealthConfig, healthsvc.ServeMux.Handle("/health", h) healthsvc.ServeMux.Handle("/healthz", h) - s.group.Add(healthsvc.Start) + return s.mgr.Add(healthsvc) } + + return nil } func (s *Server) setupGatewayAPI(contourConfiguration contour_api_v1alpha1.ContourConfigurationSpec, - mgr manager.Manager, eventHandler *contour.EventRecorder, sh *k8s.StatusUpdateHandler, isLeader chan struct{}) { + mgr manager.Manager, eventHandler *contour.EventRecorder, sh *k8s.StatusUpdateHandler) []leadership.NeedLeaderElectionNotification { + + needLeadershipNotification := []leadership.NeedLeaderElectionNotification{} // Check if GatewayAPI is configured. if contourConfiguration.Gateway != nil { // Create and register the gatewayclass controller with the manager. gatewayClassControllerName := contourConfiguration.Gateway.ControllerName - if _, err := controller.NewGatewayClassController( + gwClass, err := controller.RegisterGatewayClassController( + s.log.WithField("context", "gatewayclass-controller"), mgr, eventHandler, sh.Writer(), - s.log.WithField("context", "gatewayclass-controller"), gatewayClassControllerName, - isLeader, - ); err != nil { + ) + if err != nil { s.log.WithError(err).Fatal("failed to create gatewayclass-controller") } + needLeadershipNotification = append(needLeadershipNotification, gwClass) // Create and register the NewGatewayController controller with the manager. - if _, err := controller.NewGatewayController( + gw, err := controller.RegisterGatewayController( + s.log.WithField("context", "gateway-controller"), mgr, eventHandler, sh.Writer(), - s.log.WithField("context", "gateway-controller"), gatewayClassControllerName, - isLeader, - ); err != nil { + ) + if err != nil { s.log.WithError(err).Fatal("failed to create gateway-controller") } + needLeadershipNotification = append(needLeadershipNotification, gw) // Create and register the HTTPRoute controller with the manager. - if _, err := controller.NewHTTPRouteController(mgr, eventHandler, s.log.WithField("context", "httproute-controller")); err != nil { + if err := controller.RegisterHTTPRouteController(s.log.WithField("context", "httproute-controller"), mgr, eventHandler); err != nil { s.log.WithError(err).Fatal("failed to create httproute-controller") } // Create and register the TLSRoute controller with the manager. - if _, err := controller.NewTLSRouteController(mgr, eventHandler, s.log.WithField("context", "tlsroute-controller")); err != nil { + if err := controller.RegisterTLSRouteController(s.log.WithField("context", "tlsroute-controller"), mgr, eventHandler); err != nil { s.log.WithError(err).Fatal("failed to create tlsroute-controller") } @@ -745,6 +760,7 @@ func (s *Server) setupGatewayAPI(contourConfiguration contour_api_v1alpha1.Conto s.log.WithError(err).WithField("resource", "namespaces").Fatal("failed to create informer") } } + return needLeadershipNotification } type dagBuilderConfig struct { @@ -760,7 +776,7 @@ type dagBuilderConfig struct { fallbackCert *types.NamespacedName } -func (s *Server) getDAGBuilder(dbc dagBuilderConfig) dag.Builder { +func (s *Server) getDAGBuilder(dbc dagBuilderConfig) *dag.Builder { var requestHeadersPolicy dag.HeadersPolicy var responseHeadersPolicy dag.HeadersPolicy @@ -847,7 +863,7 @@ func (s *Server) getDAGBuilder(dbc dagBuilderConfig) dag.Builder { configuredSecretRefs = append(configuredSecretRefs, dbc.clientCert) } - builder := dag.Builder{ + builder := &dag.Builder{ Source: dag.KubernetesCache{ RootNamespaces: dbc.rootNamespaces, IngressClassName: dbc.ingressClassName, diff --git a/cmd/contour/serve_test.go b/cmd/contour/serve_test.go index 6b79713a598..9d697d4bda5 100644 --- a/cmd/contour/serve_test.go +++ b/cmd/contour/serve_test.go @@ -41,7 +41,7 @@ func TestGetDAGBuilder(t *testing.T) { log: logrus.StandardLogger(), } got := serve.getDAGBuilder(dagBuilderConfig{rootNamespaces: []string{}, dnsLookupFamily: contour_api_v1alpha1.AutoClusterDNSFamily}) - commonAssertions(t, &got) + commonAssertions(t, got) assert.Empty(t, got.Source.ConfiguredSecretRefs) }) @@ -52,7 +52,7 @@ func TestGetDAGBuilder(t *testing.T) { log: logrus.StandardLogger(), } got := serve.getDAGBuilder(dagBuilderConfig{rootNamespaces: []string{}, dnsLookupFamily: contour_api_v1alpha1.AutoClusterDNSFamily, clientCert: clientCert}) - commonAssertions(t, &got) + commonAssertions(t, got) assert.ElementsMatch(t, got.Source.ConfiguredSecretRefs, []*types.NamespacedName{clientCert}) }) @@ -63,7 +63,7 @@ func TestGetDAGBuilder(t *testing.T) { log: logrus.StandardLogger(), } got := serve.getDAGBuilder(dagBuilderConfig{rootNamespaces: []string{}, dnsLookupFamily: contour_api_v1alpha1.AutoClusterDNSFamily, fallbackCert: fallbackCert}) - commonAssertions(t, &got) + commonAssertions(t, got) assert.ElementsMatch(t, got.Source.ConfiguredSecretRefs, []*types.NamespacedName{fallbackCert}) }) @@ -75,7 +75,7 @@ func TestGetDAGBuilder(t *testing.T) { log: logrus.StandardLogger(), } got := serve.getDAGBuilder(dagBuilderConfig{rootNamespaces: []string{}, dnsLookupFamily: contour_api_v1alpha1.AutoClusterDNSFamily, clientCert: clientCert, fallbackCert: fallbackCert}) - commonAssertions(t, &got) + commonAssertions(t, got) assert.ElementsMatch(t, got.Source.ConfiguredSecretRefs, []*types.NamespacedName{clientCert, fallbackCert}) }) @@ -103,15 +103,15 @@ func TestGetDAGBuilder(t *testing.T) { log: logrus.StandardLogger(), } got := serve.getDAGBuilder(dagBuilderConfig{rootNamespaces: []string{}, dnsLookupFamily: contour_api_v1alpha1.AutoClusterDNSFamily, headersPolicy: policy}) - commonAssertions(t, &got) + commonAssertions(t, got) - httpProxyProcessor := mustGetHTTPProxyProcessor(t, &got) + httpProxyProcessor := mustGetHTTPProxyProcessor(t, got) assert.EqualValues(t, policy.RequestHeadersPolicy.Set, httpProxyProcessor.RequestHeadersPolicy.Set) assert.ElementsMatch(t, policy.RequestHeadersPolicy.Remove, httpProxyProcessor.RequestHeadersPolicy.Remove) assert.EqualValues(t, policy.ResponseHeadersPolicy.Set, httpProxyProcessor.ResponseHeadersPolicy.Set) assert.ElementsMatch(t, policy.ResponseHeadersPolicy.Remove, httpProxyProcessor.ResponseHeadersPolicy.Remove) - ingressProcessor := mustGetIngressProcessor(t, &got) + ingressProcessor := mustGetIngressProcessor(t, got) assert.EqualValues(t, map[string]string(nil), ingressProcessor.RequestHeadersPolicy.Set) assert.ElementsMatch(t, map[string]string(nil), ingressProcessor.RequestHeadersPolicy.Remove) assert.EqualValues(t, map[string]string(nil), ingressProcessor.ResponseHeadersPolicy.Set) @@ -143,9 +143,9 @@ func TestGetDAGBuilder(t *testing.T) { } got := serve.getDAGBuilder(dagBuilderConfig{rootNamespaces: []string{}, dnsLookupFamily: contour_api_v1alpha1.AutoClusterDNSFamily, headersPolicy: policy, applyHeaderPolicyToIngress: true}) - commonAssertions(t, &got) + commonAssertions(t, got) - ingressProcessor := mustGetIngressProcessor(t, &got) + ingressProcessor := mustGetIngressProcessor(t, got) assert.EqualValues(t, policy.RequestHeadersPolicy.Set, ingressProcessor.RequestHeadersPolicy.Set) assert.ElementsMatch(t, policy.RequestHeadersPolicy.Remove, ingressProcessor.RequestHeadersPolicy.Remove) assert.EqualValues(t, policy.ResponseHeadersPolicy.Set, ingressProcessor.ResponseHeadersPolicy.Set) diff --git a/examples/contour/02-role-contour.yaml b/examples/contour/02-role-contour.yaml index c6b3129d4b8..f4571bd8a46 100644 --- a/examples/contour/02-role-contour.yaml +++ b/examples/contour/02-role-contour.yaml @@ -13,6 +13,7 @@ rules: - "" resources: - configmaps + - events verbs: - create - get @@ -28,6 +29,14 @@ rules: - get - list - watch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - update - apiGroups: - gateway.networking.k8s.io resources: diff --git a/examples/render/contour-gateway.yaml b/examples/render/contour-gateway.yaml index b89acb56373..0dcc24fbf52 100644 --- a/examples/render/contour-gateway.yaml +++ b/examples/render/contour-gateway.yaml @@ -4843,6 +4843,7 @@ rules: - "" resources: - configmaps + - events verbs: - create - get @@ -4858,6 +4859,14 @@ rules: - get - list - watch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - update - apiGroups: - gateway.networking.k8s.io resources: diff --git a/examples/render/contour.yaml b/examples/render/contour.yaml index 8f32a0d1ba5..8ad0fc4ae42 100644 --- a/examples/render/contour.yaml +++ b/examples/render/contour.yaml @@ -4840,6 +4840,7 @@ rules: - "" resources: - configmaps + - events verbs: - create - get @@ -4855,6 +4856,14 @@ rules: - get - list - watch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - update - apiGroups: - gateway.networking.k8s.io resources: diff --git a/go.mod b/go.mod index bbac933e4ef..07da5e12cf0 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.6 github.com/google/go-github/v39 v39.0.0 - github.com/google/uuid v1.1.2 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/jetstack/cert-manager v1.5.1 github.com/onsi/ginkgo v1.16.5-0.20211011165036-638dfbc0fced @@ -22,6 +21,7 @@ require ( github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 github.com/tsaarni/certyaml v0.6.2 + github.com/vektra/mockery/v2 v2.9.4 golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914 google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c google.golang.org/grpc v1.38.0 diff --git a/go.sum b/go.sum index c4300027291..34261ff5803 100644 --- a/go.sum +++ b/go.sum @@ -500,6 +500,7 @@ github.com/googleapis/gnostic v0.5.1/go.mod h1:6U4PtQXGIEt/Z3h5MAT7FNofLnw9vXk2c github.com/googleapis/gnostic v0.5.5 h1:9fHAtK0uDfpveeqqo1hkEZJcFvYXAiCN3UutL8F9xHw= github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97DwqyJO1AENw9kA= github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= @@ -559,6 +560,7 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= @@ -608,6 +610,7 @@ github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMW github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= @@ -639,6 +642,7 @@ github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls= github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -679,6 +683,7 @@ github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceT github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= github.com/mitchellh/copystructure v1.1.1/go.mod h1:EBArHfARyrSWO/+Wyr9zwEkc6XMFB9XyNgFNmRkZZU4= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= @@ -688,6 +693,7 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= @@ -781,6 +787,7 @@ github.com/pavel-v-chernykh/keystore-go v2.1.0+incompatible/go.mod h1:xlUlxe/2It github.com/pavel-v-chernykh/keystore-go/v4 v4.1.0/go.mod h1:2ejgys4qY+iNVW1IittZhyRYA6MNv8TgM6VHqojbB9g= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= @@ -849,6 +856,9 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.3.2/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.4.0/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/zerolog v1.18.0 h1:CbAm3kP2Tptby1i9sYy2MGRg0uxIN9cyDb59Ys7W8z8= +github.com/rs/zerolog v1.18.0/go.mod h1:9nvC1axdVrAHcu/s9taAVfBuIdTZLVQmKQyvrUjF5+I= github.com/rubenv/sql-migrate v0.0.0-20200616145509-8d140a17f351/go.mod h1:DCgfY80j8GYL7MLEfvcpSFvjD0L5yZq/aZUJmhZklyg= github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -875,8 +885,10 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= @@ -884,8 +896,10 @@ github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= +github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.2-0.20171109065643-2da4a54c5cee/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= @@ -896,6 +910,7 @@ github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSW github.com/spf13/cobra v1.2.1 h1:+KmjbUw1hriSNMF55oPrkZcb27aECyrj8V2ytv7kWDw= github.com/spf13/cobra v1.2.1/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t68Nk= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.1-0.20171106142849-4c012f6dcd95/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= @@ -906,6 +921,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44= github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= @@ -913,6 +929,7 @@ github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3 github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -921,6 +938,7 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -940,6 +958,8 @@ github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= +github.com/vektra/mockery/v2 v2.9.4 h1:ZjpYWY+YLkDIKrKtFnYPxJax10lktcUapWZtOSg4g7g= +github.com/vektra/mockery/v2 v2.9.4/go.mod h1:2gU4Cf/f8YyC8oEaSXfCnZBMxMjMl/Ko205rlP0fO90= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= @@ -954,6 +974,7 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs= github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA= github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -1301,6 +1322,7 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191004055002-72853e10c5a3/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -1328,6 +1350,7 @@ golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200227222343-706bc42d1f0d/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw= golang.org/x/tools v0.0.0-20200312045724-11d5b4c81c7d/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw= +golang.org/x/tools v0.0.0-20200323144430-8dcfad9e016e/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= golang.org/x/tools v0.0.0-20200501065659-ab2804fb9c9d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -1501,6 +1524,7 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.51.1/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= diff --git a/hack/actions/check-uncommitted-codegen.sh b/hack/actions/check-uncommitted-codegen.sh index 8bea5f8d7ef..632b62ef116 100755 --- a/hack/actions/check-uncommitted-codegen.sh +++ b/hack/actions/check-uncommitted-codegen.sh @@ -7,6 +7,8 @@ set -o pipefail readonly HERE=$(cd $(dirname $0) && pwd) readonly REPO=$(cd ${HERE}/../.. && pwd) +mock_dirs=$(find ${REPO} -name mocks -type d) + declare -r -a TARGETS=( ${REPO}/apis ${REPO}/site/content/guides/metrics @@ -14,6 +16,7 @@ declare -r -a TARGETS=( ${REPO}/examples/contour ${REPO}/examples/gateway ${REPO}/site/content/docs/main/config/api-reference.html + ${mock_dirs} ) if git status -s ${TARGETS[@]} 2>&1 | grep -E -q '^\s+[MADRCU]' diff --git a/internal/contour/handler.go b/internal/contour/handler.go index cc2c51ab4e1..157ed04840b 100644 --- a/internal/contour/handler.go +++ b/internal/contour/handler.go @@ -17,6 +17,7 @@ package contour import ( + "context" "time" "github.com/google/go-cmp/cmp" @@ -28,38 +29,49 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type EventHandlerConfig struct { + Logger logrus.FieldLogger + Builder *dag.Builder + Observer dag.Observer + HoldoffDelay, HoldoffMaxDelay time.Duration + StatusUpdater k8s.StatusUpdater +} + // EventHandler implements cache.ResourceEventHandler, filters k8s events towards // a dag.Builder and calls through to the Observer to notify it that a new DAG // is available. type EventHandler struct { - Builder dag.Builder - Observer dag.Observer + builder *dag.Builder + observer dag.Observer - HoldoffDelay, HoldoffMaxDelay time.Duration + holdoffDelay, holdoffMaxDelay time.Duration - StatusUpdater k8s.StatusUpdater + statusUpdater k8s.StatusUpdater logrus.FieldLogger - // IsLeader will become ready to read when this EventHandler becomes - // the leader. If IsLeader is not readable, or nil, status events will - // be suppressed. - IsLeader chan struct{} - update chan interface{} - // Sequence is a channel that receives a incrementing sequence number - // for each update processed. The updates may be processed immediately, or - // delayed by a holdoff timer. In each case a non blocking send to Sequence - // will be made once the resource update is received (note - // that the DAG is not guaranteed to be called each time). - Sequence chan int + sequence chan int // seq is the sequence counter of the number of times // an event has been received. seq int } +func NewEventHandler(config EventHandlerConfig) *EventHandler { + return &EventHandler{ + FieldLogger: config.Logger, + builder: config.Builder, + observer: config.Observer, + holdoffDelay: config.HoldoffDelay, + holdoffMaxDelay: config.HoldoffMaxDelay, + statusUpdater: config.StatusUpdater, + update: make(chan interface{}), + sequence: make(chan int, 1), + } +} + type opAdd struct { obj interface{} } @@ -84,20 +96,18 @@ func (e *EventHandler) OnDelete(obj interface{}) { e.update <- opDelete{obj: obj} } -// UpdateNow enqueues a DAG update subject to the holdoff timer. -func (e *EventHandler) UpdateNow() { - e.update <- true +func (e *EventHandler) NeedLeaderElection() bool { + return false } -// Start initializes the EventHandler and returns a function suitable -// for registration with a workgroup.Group. -func (e *EventHandler) Start() func(<-chan struct{}) error { - e.update = make(chan interface{}) - return e.run +// Implements leadership.NeedLeaderElectionNotification +func (e *EventHandler) OnElectedLeader() { + // Trigger an update when we are elected leader to ensure resource + // statuses are not stale. + e.update <- true } -// run is the main event handling loop. -func (e *EventHandler) run(stop <-chan struct{}) error { +func (e *EventHandler) Start(ctx context.Context) error { e.Info("started event handler") defer e.Info("stopped event handler") @@ -143,8 +153,8 @@ func (e *EventHandler) run(stop <-chan struct{}) error { timer.Stop() } - delay := e.HoldoffDelay - if time.Since(lastDAGRebuild) > e.HoldoffMaxDelay { + delay := e.holdoffDelay + if time.Since(lastDAGRebuild) > e.holdoffMaxDelay { // the maximum holdoff delay has been exceeded so schedule the update // immediately by delaying for 0ns. delay = 0 @@ -161,7 +171,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { e.rebuildDAG() e.incSequence() lastDAGRebuild = time.Now() - case <-stop: + case <-ctx.Done(): // shutdown return nil } @@ -174,7 +184,7 @@ func (e *EventHandler) run(stop <-chan struct{}) error { func (e *EventHandler) onUpdate(op interface{}) bool { switch op := op.(type) { case opAdd: - return e.Builder.Source.Insert(op.obj) + return e.builder.Source.Insert(op.obj) case opUpdate: if cmp.Equal(op.oldObj, op.newObj, cmpopts.IgnoreFields(contour_api_v1.HTTPProxy{}, "Status"), @@ -184,11 +194,11 @@ func (e *EventHandler) onUpdate(op interface{}) bool { e.WithField("op", "update").Debugf("%T skipping update, only status has changed", op.newObj) return false } - remove := e.Builder.Source.Remove(op.oldObj) - insert := e.Builder.Source.Insert(op.newObj) + remove := e.builder.Source.Remove(op.oldObj) + insert := e.builder.Source.Insert(op.newObj) return remove || insert case opDelete: - return e.Builder.Source.Remove(op.obj) + return e.builder.Source.Remove(op.obj) case bool: return op default: @@ -196,11 +206,20 @@ func (e *EventHandler) onUpdate(op interface{}) bool { } } +// Sequence returns a channel that receives a incrementing sequence number +// for each update processed. The updates may be processed immediately, or +// delayed by a holdoff timer. In each case a non blocking send to the +// sequence channel will be made once the resource update is received (note +// that the DAG is not guaranteed to be called each time). +func (e *EventHandler) Sequence() <-chan int { + return e.sequence +} + // incSequence bumps the sequence counter and sends it to e.Sequence. func (e *EventHandler) incSequence() { e.seq++ select { - case e.Sequence <- e.seq: + case e.sequence <- e.seq: // This is a non blocking send so if this field is nil, or the // receiver is not ready this send does not block incSequence's caller. default: @@ -210,10 +229,10 @@ func (e *EventHandler) incSequence() { // rebuildDAG builds a new DAG and sends it to the Observer, // the updates the status on objects, and updates the metrics. func (e *EventHandler) rebuildDAG() { - latestDAG := e.Builder.Build() - e.Observer.OnChange(latestDAG) + latestDAG := e.builder.Build() + e.observer.OnChange(latestDAG) for _, upd := range latestDAG.StatusCache.GetStatusUpdates() { - e.StatusUpdater.Send(upd) + e.statusUpdater.Send(upd) } } diff --git a/internal/contour/handler_test.go b/internal/contour/handler_test.go new file mode 100644 index 00000000000..293b17ad241 --- /dev/null +++ b/internal/contour/handler_test.go @@ -0,0 +1,26 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package contour + +import ( + "testing" + + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +func TestEventHandlerNotRequireLeaderElection(t *testing.T) { + var e manager.LeaderElectionRunnable = &EventHandler{} + require.False(t, e.NeedLeaderElection()) +} diff --git a/internal/contour/metrics.go b/internal/contour/metrics.go index 0a8f921315e..a4c69f3c1f6 100644 --- a/internal/contour/metrics.go +++ b/internal/contour/metrics.go @@ -61,29 +61,40 @@ func (e *EventRecorder) recordOperation(op string, obj interface{}) { // RebuildMetricsObserver is a dag.Observer that emits metrics for DAG rebuilds. type RebuildMetricsObserver struct { // Metrics to emit. - Metrics *metrics.Metrics + metrics *metrics.Metrics - // IsLeader will become ready to read when this EventHandler becomes - // the leader. If IsLeader is not readable, or nil, status events will + // httpProxyMetricsEnabled will become ready to read when this EventHandler becomes + // the leader. If httpProxyMetricsEnabled is not readable, or nil, status events will // be suppressed. - IsLeader chan struct{} + httpProxyMetricsEnabled chan struct{} // NextObserver contains the stack of dag.Observers that act on DAG rebuilds. - NextObserver dag.Observer + nextObserver dag.Observer +} + +func NewRebuildMetricsObserver(metrics *metrics.Metrics, nextObserver dag.Observer) *RebuildMetricsObserver { + return &RebuildMetricsObserver{ + metrics: metrics, + nextObserver: nextObserver, + httpProxyMetricsEnabled: make(chan struct{}), + } +} + +func (m *RebuildMetricsObserver) OnElectedLeader() { + close(m.httpProxyMetricsEnabled) } func (m *RebuildMetricsObserver) OnChange(d *dag.DAG) { - m.Metrics.SetDAGLastRebuilt(time.Now()) - m.Metrics.SetDAGRebuiltTotal() + m.metrics.SetDAGLastRebuilt(time.Now()) + m.metrics.SetDAGRebuiltTotal() - timer := prometheus.NewTimer(m.Metrics.CacheHandlerOnUpdateSummary) - m.NextObserver.OnChange(d) + timer := prometheus.NewTimer(m.metrics.CacheHandlerOnUpdateSummary) + m.nextObserver.OnChange(d) timer.ObserveDuration() select { - // If we are leader, the IsLeader channel is closed. - case <-m.IsLeader: - m.Metrics.SetHTTPProxyMetric(calculateRouteMetric(d.StatusCache.GetProxyUpdates())) + case <-m.httpProxyMetricsEnabled: + m.metrics.SetHTTPProxyMetric(calculateRouteMetric(d.StatusCache.GetProxyUpdates())) default: } } diff --git a/internal/controller/controller.go b/internal/controller/controller.go new file mode 100644 index 00000000000..a9d572c8e69 --- /dev/null +++ b/internal/controller/controller.go @@ -0,0 +1,30 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import "sigs.k8s.io/controller-runtime/pkg/controller" + +// Wrapper for that ensures controller-runtime Controllers +// are run by controller-runtime Manager, regardless of +// leader-election status. Controllers can be created as +// unmanaged and manually registered with a Manager using +// this wrapper, otherwise they will only be run when their +// Manager is elected leader. +type noLeaderElectionController struct { + controller.Controller +} + +func (*noLeaderElectionController) NeedLeaderElection() bool { + return false +} diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go new file mode 100644 index 00000000000..ba18ea33857 --- /dev/null +++ b/internal/controller/controller_test.go @@ -0,0 +1,67 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller_test + +import ( + "testing" + + logr_testing "github.com/go-logr/logr/testing" + "github.com/projectcontour/contour/internal/controller" + "github.com/projectcontour/contour/internal/controller/mocks" + "github.com/projectcontour/contour/internal/fixture" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +//go:generate go run github.com/vektra/mockery/v2 --case=snake --name=Manager --srcpkg=sigs.k8s.io/controller-runtime/pkg/manager + +func TestRegisterControllers(t *testing.T) { + tests := map[string]func(*mocks.Manager) error{ + "gateway controller": func(mockManager *mocks.Manager) error { + _, err := controller.RegisterGatewayController(fixture.NewTestLogger(t), mockManager, nil, nil, "some-controller") + return err + }, + "gatewayclass controller": func(mockManager *mocks.Manager) error { + _, err := controller.RegisterGatewayClassController(fixture.NewTestLogger(t), mockManager, nil, nil, "some-gateway") + return err + }, + "httproute controller": func(mockManager *mocks.Manager) error { + return controller.RegisterHTTPRouteController(fixture.NewTestLogger(t), mockManager, nil) + }, + "tlsroute controller": func(mockManager *mocks.Manager) error { + return controller.RegisterTLSRouteController(fixture.NewTestLogger(t), mockManager, nil) + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mockManager := &mocks.Manager{} + + // TODO: see if there is a way we can automatically ignore these. + mockManager.On("GetClient").Return(nil).Maybe() + mockManager.On("GetLogger").Return(logr_testing.TestLogger{T: t}).Maybe() + mockManager.On("SetFields", mock.Anything).Return(nil).Maybe() + mockManager.On("Elected").Return(nil).Maybe() + + mockManager.On("Add", mock.MatchedBy(func(r manager.LeaderElectionRunnable) bool { + return r.NeedLeaderElection() == false + })).Return(nil).Once() + + require.NoError(t, test(mockManager)) + + require.True(t, mockManager.AssertExpectations(t)) + }) + } +} diff --git a/internal/controller/gateway.go b/internal/controller/gateway.go index 6f6281a5320..3af4e5f51ae 100644 --- a/internal/controller/gateway.go +++ b/internal/controller/gateway.go @@ -19,6 +19,7 @@ import ( "time" "github.com/projectcontour/contour/internal/k8s" + "github.com/projectcontour/contour/internal/leadership" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -35,38 +36,43 @@ import ( ) type gatewayReconciler struct { - ctx context.Context client client.Client eventHandler cache.ResourceEventHandler statusUpdater k8s.StatusUpdater log logrus.FieldLogger - // gatewayClassControllerName is the configured controller of managed gatewayclasses. gatewayClassControllerName gatewayapi_v1alpha2.GatewayController + eventSource chan event.GenericEvent } -// NewGatewayController creates the gateway controller from mgr. The controller will be pre-configured +// RegisterGatewayController creates the gateway controller from mgr. The controller will be pre-configured // to watch for Gateway objects across all namespaces and reconcile those that match class. -func NewGatewayController( +func RegisterGatewayController( + log logrus.FieldLogger, mgr manager.Manager, eventHandler cache.ResourceEventHandler, statusUpdater k8s.StatusUpdater, - log logrus.FieldLogger, gatewayClassControllerName string, - isLeader <-chan struct{}, -) (controller.Controller, error) { +) (leadership.NeedLeaderElectionNotification, error) { r := &gatewayReconciler{ - ctx: context.Background(), + log: log, client: mgr.GetClient(), eventHandler: eventHandler, statusUpdater: statusUpdater, - log: log, gatewayClassControllerName: gatewayapi_v1alpha2.GatewayController(gatewayClassControllerName), + // Set up a source.Channel that will trigger reconciles + // for all GatewayClasses when this Contour process is + // elected leader, to ensure that their statuses are up + // to date. + eventSource: make(chan event.GenericEvent), } - c, err := controller.New("gateway-controller", mgr, controller.Options{Reconciler: r}) + c, err := controller.NewUnmanaged("gateway-controller", mgr, controller.Options{Reconciler: r}) if err != nil { return nil, err } + if err := mgr.Add(&noLeaderElectionController{c}); err != nil { + return nil, err + } if err := c.Watch( &source.Kind{Type: &gatewayapi_v1alpha2.Gateway{}}, @@ -90,31 +96,29 @@ func NewGatewayController( // for all Gateways when this Contour process is // elected leader, to ensure that their statuses are up // to date. - eventSource := make(chan event.GenericEvent) - go func() { - <-isLeader - log.Info("elected leader, triggering reconciles for all gateways") - - var gateways gatewayapi_v1alpha2.GatewayList - if err := r.client.List(context.Background(), &gateways); err != nil { - log.WithError(err).Error("error listing gateways") - return - } - - for i := range gateways.Items { - eventSource <- event.GenericEvent{Object: &gateways.Items[i]} - } - }() - if err := c.Watch( - &source.Channel{Source: eventSource}, + &source.Channel{Source: r.eventSource}, &handler.EnqueueRequestForObject{}, predicate.NewPredicateFuncs(r.hasMatchingController), ); err != nil { return nil, err } - return c, nil + return r, nil +} + +func (r *gatewayReconciler) OnElectedLeader() { + r.log.Info("elected leader, triggering reconciles for all gateways") + + var gateways gatewayapi_v1alpha2.GatewayList + if err := r.client.List(context.Background(), &gateways); err != nil { + r.log.WithError(err).Error("error listing gateways") + return + } + + for i := range gateways.Items { + r.eventSource <- event.GenericEvent{Object: &gateways.Items[i]} + } } func (r *gatewayReconciler) mapGatewayClassToGateways(gatewayClass client.Object) []reconcile.Request { diff --git a/internal/controller/gatewayclass.go b/internal/controller/gatewayclass.go index 98bc8960f9d..882f092399e 100644 --- a/internal/controller/gatewayclass.go +++ b/internal/controller/gatewayclass.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/projectcontour/contour/internal/k8s" + "github.com/projectcontour/contour/internal/leadership" "github.com/projectcontour/contour/internal/status" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -40,31 +41,39 @@ type gatewayClassReconciler struct { statusUpdater k8s.StatusUpdater log logrus.FieldLogger controller gatewayapi_v1alpha2.GatewayController + eventSource chan event.GenericEvent } -// NewGatewayClassController creates the gatewayclass controller. The controller +// RegisterGatewayClassController creates the gatewayclass controller. The controller // will be pre-configured to watch for cluster-scoped GatewayClass objects with // a controller field that matches name. -func NewGatewayClassController( +func RegisterGatewayClassController( + log logrus.FieldLogger, mgr manager.Manager, eventHandler cache.ResourceEventHandler, statusUpdater k8s.StatusUpdater, - log logrus.FieldLogger, name string, - isLeader <-chan struct{}, -) (controller.Controller, error) { +) (leadership.NeedLeaderElectionNotification, error) { r := &gatewayClassReconciler{ client: mgr.GetClient(), eventHandler: eventHandler, statusUpdater: statusUpdater, log: log, controller: gatewayapi_v1alpha2.GatewayController(name), + // Set up a source.Channel that will trigger reconciles + // for all GatewayClasses when this Contour process is + // elected leader, to ensure that their statuses are up + // to date. + eventSource: make(chan event.GenericEvent), } - c, err := controller.New("gatewayclass-controller", mgr, controller.Options{Reconciler: r}) + c, err := controller.NewUnmanaged("gatewayclass-controller", mgr, controller.Options{Reconciler: r}) if err != nil { return nil, err } + if err := mgr.Add(&noLeaderElectionController{c}); err != nil { + return nil, err + } // Only enqueue GatewayClass objects that match name. if err := c.Watch( @@ -75,35 +84,29 @@ func NewGatewayClassController( return nil, err } - // Set up a source.Channel that will trigger reconciles - // for all GatewayClasses when this Contour process is - // elected leader, to ensure that their statuses are up - // to date. - eventSource := make(chan event.GenericEvent) - go func() { - <-isLeader - log.Info("elected leader, triggering reconciles for all gatewayclasses") - - var gatewayClasses gatewayapi_v1alpha2.GatewayClassList - if err := r.client.List(context.Background(), &gatewayClasses); err != nil { - log.WithError(err).Error("error listing gatewayclasses") - return - } - - for i := range gatewayClasses.Items { - eventSource <- event.GenericEvent{Object: &gatewayClasses.Items[i]} - } - }() - if err := c.Watch( - &source.Channel{Source: eventSource}, + &source.Channel{Source: r.eventSource}, &handler.EnqueueRequestForObject{}, predicate.NewPredicateFuncs(r.hasMatchingController), ); err != nil { return nil, err } - return c, nil + return r, nil +} + +func (r *gatewayClassReconciler) OnElectedLeader() { + r.log.Info("elected leader, triggering reconciles for all gatewayclasses") + + var gatewayClasses gatewayapi_v1alpha2.GatewayClassList + if err := r.client.List(context.Background(), &gatewayClasses); err != nil { + r.log.WithError(err).Error("error listing gatewayclasses") + return + } + + for i := range gatewayClasses.Items { + r.eventSource <- event.GenericEvent{Object: &gatewayClasses.Items[i]} + } } // hasMatchingController returns true if the provided object is a GatewayClass diff --git a/internal/controller/httproute.go b/internal/controller/httproute.go index b1e895f2fef..2780bed7484 100644 --- a/internal/controller/httproute.go +++ b/internal/controller/httproute.go @@ -36,22 +36,26 @@ type httpRouteReconciler struct { logrus.FieldLogger } -// NewHTTPRouteController creates the httproute controller from mgr. The controller will be pre-configured +// RegisterHTTPRouteController creates the httproute controller from mgr. The controller will be pre-configured // to watch for HTTPRoute objects across all namespaces. -func NewHTTPRouteController(mgr manager.Manager, eventHandler cache.ResourceEventHandler, log logrus.FieldLogger) (controller.Controller, error) { +func RegisterHTTPRouteController(log logrus.FieldLogger, mgr manager.Manager, eventHandler cache.ResourceEventHandler) error { r := &httpRouteReconciler{ client: mgr.GetClient(), eventHandler: eventHandler, FieldLogger: log, } - c, err := controller.New("httproute-controller", mgr, controller.Options{Reconciler: r}) + c, err := controller.NewUnmanaged("httproute-controller", mgr, controller.Options{Reconciler: r}) if err != nil { - return nil, err + return err } + if err := mgr.Add(&noLeaderElectionController{c}); err != nil { + return err + } + if err := c.Watch(&source.Kind{Type: &gatewayapi_v1alpha2.HTTPRoute{}}, &handler.EnqueueRequestForObject{}); err != nil { - return nil, err + return err } - return c, nil + return nil } func (r *httpRouteReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { diff --git a/internal/controller/mocks/manager.go b/internal/controller/mocks/manager.go new file mode 100644 index 00000000000..f677619ac82 --- /dev/null +++ b/internal/controller/mocks/manager.go @@ -0,0 +1,311 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import ( + cache "sigs.k8s.io/controller-runtime/pkg/cache" + client "sigs.k8s.io/controller-runtime/pkg/client" + + context "context" + + healthz "sigs.k8s.io/controller-runtime/pkg/healthz" + + http "net/http" + + logr "github.com/go-logr/logr" + + manager "sigs.k8s.io/controller-runtime/pkg/manager" + + meta "k8s.io/apimachinery/pkg/api/meta" + + mock "github.com/stretchr/testify/mock" + + record "k8s.io/client-go/tools/record" + + rest "k8s.io/client-go/rest" + + runtime "k8s.io/apimachinery/pkg/runtime" + + v1alpha1 "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" + + webhook "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +// Manager is an autogenerated mock type for the Manager type +type Manager struct { + mock.Mock +} + +// Add provides a mock function with given fields: _a0 +func (_m *Manager) Add(_a0 manager.Runnable) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(manager.Runnable) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// AddHealthzCheck provides a mock function with given fields: name, check +func (_m *Manager) AddHealthzCheck(name string, check healthz.Checker) error { + ret := _m.Called(name, check) + + var r0 error + if rf, ok := ret.Get(0).(func(string, healthz.Checker) error); ok { + r0 = rf(name, check) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// AddMetricsExtraHandler provides a mock function with given fields: path, handler +func (_m *Manager) AddMetricsExtraHandler(path string, handler http.Handler) error { + ret := _m.Called(path, handler) + + var r0 error + if rf, ok := ret.Get(0).(func(string, http.Handler) error); ok { + r0 = rf(path, handler) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// AddReadyzCheck provides a mock function with given fields: name, check +func (_m *Manager) AddReadyzCheck(name string, check healthz.Checker) error { + ret := _m.Called(name, check) + + var r0 error + if rf, ok := ret.Get(0).(func(string, healthz.Checker) error); ok { + r0 = rf(name, check) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Elected provides a mock function with given fields: +func (_m *Manager) Elected() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// GetAPIReader provides a mock function with given fields: +func (_m *Manager) GetAPIReader() client.Reader { + ret := _m.Called() + + var r0 client.Reader + if rf, ok := ret.Get(0).(func() client.Reader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.Reader) + } + } + + return r0 +} + +// GetCache provides a mock function with given fields: +func (_m *Manager) GetCache() cache.Cache { + ret := _m.Called() + + var r0 cache.Cache + if rf, ok := ret.Get(0).(func() cache.Cache); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(cache.Cache) + } + } + + return r0 +} + +// GetClient provides a mock function with given fields: +func (_m *Manager) GetClient() client.Client { + ret := _m.Called() + + var r0 client.Client + if rf, ok := ret.Get(0).(func() client.Client); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.Client) + } + } + + return r0 +} + +// GetConfig provides a mock function with given fields: +func (_m *Manager) GetConfig() *rest.Config { + ret := _m.Called() + + var r0 *rest.Config + if rf, ok := ret.Get(0).(func() *rest.Config); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*rest.Config) + } + } + + return r0 +} + +// GetControllerOptions provides a mock function with given fields: +func (_m *Manager) GetControllerOptions() v1alpha1.ControllerConfigurationSpec { + ret := _m.Called() + + var r0 v1alpha1.ControllerConfigurationSpec + if rf, ok := ret.Get(0).(func() v1alpha1.ControllerConfigurationSpec); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(v1alpha1.ControllerConfigurationSpec) + } + + return r0 +} + +// GetEventRecorderFor provides a mock function with given fields: name +func (_m *Manager) GetEventRecorderFor(name string) record.EventRecorder { + ret := _m.Called(name) + + var r0 record.EventRecorder + if rf, ok := ret.Get(0).(func(string) record.EventRecorder); ok { + r0 = rf(name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(record.EventRecorder) + } + } + + return r0 +} + +// GetFieldIndexer provides a mock function with given fields: +func (_m *Manager) GetFieldIndexer() client.FieldIndexer { + ret := _m.Called() + + var r0 client.FieldIndexer + if rf, ok := ret.Get(0).(func() client.FieldIndexer); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.FieldIndexer) + } + } + + return r0 +} + +// GetLogger provides a mock function with given fields: +func (_m *Manager) GetLogger() logr.Logger { + ret := _m.Called() + + var r0 logr.Logger + if rf, ok := ret.Get(0).(func() logr.Logger); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(logr.Logger) + } + } + + return r0 +} + +// GetRESTMapper provides a mock function with given fields: +func (_m *Manager) GetRESTMapper() meta.RESTMapper { + ret := _m.Called() + + var r0 meta.RESTMapper + if rf, ok := ret.Get(0).(func() meta.RESTMapper); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(meta.RESTMapper) + } + } + + return r0 +} + +// GetScheme provides a mock function with given fields: +func (_m *Manager) GetScheme() *runtime.Scheme { + ret := _m.Called() + + var r0 *runtime.Scheme + if rf, ok := ret.Get(0).(func() *runtime.Scheme); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*runtime.Scheme) + } + } + + return r0 +} + +// GetWebhookServer provides a mock function with given fields: +func (_m *Manager) GetWebhookServer() *webhook.Server { + ret := _m.Called() + + var r0 *webhook.Server + if rf, ok := ret.Get(0).(func() *webhook.Server); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*webhook.Server) + } + } + + return r0 +} + +// SetFields provides a mock function with given fields: _a0 +func (_m *Manager) SetFields(_a0 interface{}) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Start provides a mock function with given fields: ctx +func (_m *Manager) Start(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/internal/controller/tlsroute.go b/internal/controller/tlsroute.go index 492d496b8f0..8e7be8cd41f 100644 --- a/internal/controller/tlsroute.go +++ b/internal/controller/tlsroute.go @@ -36,22 +36,26 @@ type tlsRouteReconciler struct { logrus.FieldLogger } -// NewTLSRouteController creates the tlsroute controller from mgr. The controller will be pre-configured +// RegisterTLSRouteController creates the tlsroute controller from mgr. The controller will be pre-configured // to watch for TLSRoute objects across all namespaces. -func NewTLSRouteController(mgr manager.Manager, eventHandler cache.ResourceEventHandler, log logrus.FieldLogger) (controller.Controller, error) { +func RegisterTLSRouteController(log logrus.FieldLogger, mgr manager.Manager, eventHandler cache.ResourceEventHandler) error { r := &tlsRouteReconciler{ client: mgr.GetClient(), eventHandler: eventHandler, FieldLogger: log, } - c, err := controller.New("tlsroute-controller", mgr, controller.Options{Reconciler: r}) + c, err := controller.NewUnmanaged("tlsroute-controller", mgr, controller.Options{Reconciler: r}) if err != nil { - return nil, err + return err } + if err := mgr.Add(&noLeaderElectionController{c}); err != nil { + return err + } + if err := c.Watch(&source.Kind{Type: &gatewayapi_v1alpha2.TLSRoute{}}, &handler.EnqueueRequestForObject{}); err != nil { - return nil, err + return err } - return c, nil + return nil } func (r *tlsRouteReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { diff --git a/internal/debug/debug.go b/internal/debug/debug.go index 26922cfc2af..da6d1cffcdb 100644 --- a/internal/debug/debug.go +++ b/internal/debug/debug.go @@ -16,6 +16,7 @@ package debug import ( + "context" "net/http" "net/http/pprof" @@ -30,12 +31,16 @@ type Service struct { Builder *dag.Builder } -// Start fulfills the g.Start contract. -// When stop is closed the http server will shutdown. -func (svc *Service) Start(stop <-chan struct{}) error { +func (svc *Service) NeedLeaderElection() bool { + return false +} + +// Implements controller-runtime Runnable interface. +// When context is done, http server will shutdown. +func (svc *Service) Start(ctx context.Context) error { registerProfile(&svc.ServeMux) registerDotWriter(&svc.ServeMux, svc.Builder) - return svc.Service.Start(stop) + return svc.Service.Start(ctx) } func registerProfile(mux *http.ServeMux) { diff --git a/internal/debug/debug_test.go b/internal/debug/debug_test.go new file mode 100644 index 00000000000..0a93e01ddef --- /dev/null +++ b/internal/debug/debug_test.go @@ -0,0 +1,29 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package debug provides http endpoints for healthcheck, metrics, +// and pprof debugging. +package debug_test + +import ( + "testing" + + "github.com/projectcontour/contour/internal/debug" + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +func TestDebugServiceNotRequireLeaderElection(t *testing.T) { + var s manager.LeaderElectionRunnable = &debug.Service{} + require.False(t, s.NeedLeaderElection()) +} diff --git a/internal/featuretests/v3/backendclientauth_test.go b/internal/featuretests/v3/backendclientauth_test.go index c01807c6a30..11e69b24ea8 100644 --- a/internal/featuretests/v3/backendclientauth_test.go +++ b/internal/featuretests/v3/backendclientauth_test.go @@ -20,7 +20,6 @@ import ( envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" projcontour "github.com/projectcontour/contour/apis/projectcontour/v1" "github.com/projectcontour/contour/apis/projectcontour/v1alpha1" - "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/dag" envoy_v3 "github.com/projectcontour/contour/internal/envoy/v3" "github.com/projectcontour/contour/internal/featuretests" @@ -33,8 +32,8 @@ import ( "k8s.io/utils/pointer" ) -func proxyClientCertificateOpt(t *testing.T) func(eh *contour.EventHandler) { - return func(eh *contour.EventHandler) { +func proxyClientCertificateOpt(t *testing.T) func(*dag.Builder) { + return func(b *dag.Builder) { secret := types.NamespacedName{ Name: "envoyclientsecret", Namespace: "default", @@ -43,7 +42,7 @@ func proxyClientCertificateOpt(t *testing.T) func(eh *contour.EventHandler) { log := fixture.NewTestLogger(t) log.SetLevel(logrus.DebugLevel) - eh.Builder.Processors = []dag.Processor{ + b.Processors = []dag.Processor{ &dag.IngressProcessor{ ClientCertificate: &secret, FieldLogger: log.WithField("context", "IngressProcessor"), diff --git a/internal/featuretests/v3/externalname_test.go b/internal/featuretests/v3/externalname_test.go index 0ddfe531446..ef142c8e573 100644 --- a/internal/featuretests/v3/externalname_test.go +++ b/internal/featuretests/v3/externalname_test.go @@ -16,7 +16,6 @@ package v3 import ( "testing" - "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/featuretests" "github.com/sirupsen/logrus" @@ -321,13 +320,13 @@ func TestExternalNameService(t *testing.T) { }) } -func enableExternalNameService(t *testing.T) func(eh *contour.EventHandler) { - return func(eh *contour.EventHandler) { +func enableExternalNameService(t *testing.T) func(*dag.Builder) { + return func(b *dag.Builder) { log := fixture.NewTestLogger(t) log.SetLevel(logrus.DebugLevel) - eh.Builder.Processors = []dag.Processor{ + b.Processors = []dag.Processor{ &dag.IngressProcessor{ EnableExternalNameService: true, FieldLogger: log.WithField("context", "IngressProcessor"), diff --git a/internal/featuretests/v3/fallbackcert_test.go b/internal/featuretests/v3/fallbackcert_test.go index 3ac8f71a486..4ad64c946d6 100644 --- a/internal/featuretests/v3/fallbackcert_test.go +++ b/internal/featuretests/v3/fallbackcert_test.go @@ -21,7 +21,6 @@ import ( envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1" - "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/dag" envoy_v3 "github.com/projectcontour/contour/internal/envoy/v3" "github.com/projectcontour/contour/internal/featuretests" @@ -32,8 +31,8 @@ import ( ) func TestFallbackCertificate(t *testing.T) { - rh, c, done := setup(t, func(eh *contour.EventHandler) { - eh.Builder.Processors = []dag.Processor{ + rh, c, done := setup(t, func(b *dag.Builder) { + b.Processors = []dag.Processor{ &dag.IngressProcessor{}, &dag.HTTPProxyProcessor{ FallbackCertificate: &types.NamespacedName{ diff --git a/internal/featuretests/v3/featuretests.go b/internal/featuretests/v3/featuretests.go index be0b9a3b350..6ff7d4675c3 100644 --- a/internal/featuretests/v3/featuretests.go +++ b/internal/featuretests/v3/featuretests.go @@ -101,49 +101,44 @@ func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *Cont rand.Seed(time.Now().Unix()) - statusUpdateCacher := &k8s.StatusUpdateCacher{} - eh := &contour.EventHandler{ - IsLeader: make(chan struct{}), - StatusUpdater: statusUpdateCacher, - FieldLogger: log, - Sequence: make(chan int, 1), - //nolint:gosec - HoldoffDelay: time.Duration(rand.Intn(100)) * time.Millisecond, - //nolint:gosec - HoldoffMaxDelay: time.Duration(rand.Intn(500)) * time.Millisecond, - Observer: &contour.RebuildMetricsObserver{ - Metrics: metrics.NewMetrics(registry), - NextObserver: dag.ComposeObservers(xdscache.ObserversOf(resources)...), + builder := &dag.Builder{ + Source: dag.KubernetesCache{ + FieldLogger: log, }, - Builder: dag.Builder{ - Source: dag.KubernetesCache{ - FieldLogger: log, + Processors: []dag.Processor{ + &dag.IngressProcessor{ + FieldLogger: log.WithField("context", "IngressProcessor"), }, + &dag.ExtensionServiceProcessor{ + FieldLogger: log.WithField("context", "ExtensionServiceProcessor"), + }, + &dag.HTTPProxyProcessor{}, + &dag.GatewayAPIProcessor{ + FieldLogger: log.WithField("context", "GatewayAPIProcessor"), + }, + &dag.ListenerProcessor{}, }, } - - eh.Builder.Processors = []dag.Processor{ - &dag.IngressProcessor{ - FieldLogger: log.WithField("context", "IngressProcessor"), - }, - &dag.ExtensionServiceProcessor{ - FieldLogger: log.WithField("context", "ExtensionServiceProcessor"), - }, - &dag.HTTPProxyProcessor{}, - &dag.GatewayAPIProcessor{ - FieldLogger: log.WithField("context", "GatewayAPIProcessor"), - }, - &dag.ListenerProcessor{}, - } - for _, opt := range opts { - if opt, ok := opt.(func(*contour.EventHandler)); ok { - opt(eh) + if opt, ok := opt.(func(*dag.Builder)); ok { + opt(builder) } } - // Make this event handler win the leader election. - close(eh.IsLeader) + statusUpdateCacher := &k8s.StatusUpdateCacher{} + eh := contour.NewEventHandler(contour.EventHandlerConfig{ + Logger: log, + StatusUpdater: statusUpdateCacher, + //nolint:gosec + HoldoffDelay: time.Duration(rand.Intn(100)) * time.Millisecond, + //nolint:gosec + HoldoffMaxDelay: time.Duration(rand.Intn(500)) * time.Millisecond, + Observer: contour.NewRebuildMetricsObserver( + metrics.NewMetrics(registry), + dag.ComposeObservers(xdscache.ObserversOf(resources)...), + ), + Builder: builder, + }) l, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) @@ -160,7 +155,7 @@ func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *Cont }() return srv.Serve(l) // srv now owns l and will close l before returning }) - g.Add(eh.Start()) + g.AddContext(eh.Start) cc, err := grpc.Dial(l.Addr().String(), grpc.WithInsecure()) require.NoError(t, err) @@ -168,7 +163,7 @@ func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *Cont rh := &resourceEventHandler{ EventHandler: eh, EndpointsHandler: et, - Sequence: eh.Sequence, + Sequence: eh.Sequence(), statusUpdateCacher: statusUpdateCacher, } @@ -202,7 +197,7 @@ type resourceEventHandler struct { EventHandler cache.ResourceEventHandler EndpointsHandler cache.ResourceEventHandler - Sequence chan int + Sequence <-chan int statusUpdateCacher *k8s.StatusUpdateCacher } diff --git a/internal/featuretests/v3/globalratelimit_test.go b/internal/featuretests/v3/globalratelimit_test.go index 94d2b8559a9..cabbbc383f8 100644 --- a/internal/featuretests/v3/globalratelimit_test.go +++ b/internal/featuretests/v3/globalratelimit_test.go @@ -24,7 +24,6 @@ import ( envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/wellknown" contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1" - "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/dag" envoy_v3 "github.com/projectcontour/contour/internal/envoy/v3" "github.com/projectcontour/contour/internal/featuretests" @@ -668,8 +667,8 @@ func TestGlobalRateLimiting(t *testing.T) { Domain: "contour", } }, - func(eh *contour.EventHandler) { - eh.Builder.Processors = []dag.Processor{ + func(b *dag.Builder) { + b.Processors = []dag.Processor{ &dag.HTTPProxyProcessor{ FallbackCertificate: &types.NamespacedName{ Name: "fallback-cert", diff --git a/internal/featuretests/v3/ingressclass_test.go b/internal/featuretests/v3/ingressclass_test.go index 59f69ce051a..1268adb7ce1 100644 --- a/internal/featuretests/v3/ingressclass_test.go +++ b/internal/featuretests/v3/ingressclass_test.go @@ -20,6 +20,7 @@ import ( envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1" "github.com/projectcontour/contour/internal/contour" + "github.com/projectcontour/contour/internal/dag" envoy_v3 "github.com/projectcontour/contour/internal/envoy/v3" "github.com/projectcontour/contour/internal/featuretests" "github.com/projectcontour/contour/internal/fixture" @@ -37,8 +38,8 @@ const ( ) func TestIngressClassAnnotation_Configured(t *testing.T) { - rh, c, done := setup(t, func(reh *contour.EventHandler) { - reh.Builder.Source.IngressClassName = "linkerd" + rh, c, done := setup(t, func(b *dag.Builder) { + b.Source.IngressClassName = "linkerd" }) defer done() @@ -502,8 +503,8 @@ func TestIngressClassAnnotation_NotConfigured(t *testing.T) { // TODO(youngnick)#2964: Disabled as part of #2495 work. func TestIngressClassAnnotationUpdate(t *testing.T) { t.Skip("Test disabled, see issue #2964") - rh, c, done := setup(t, func(reh *contour.EventHandler) { - reh.Builder.Source.IngressClassName = "contour" + rh, c, done := setup(t, func(b *dag.Builder) { + b.Source.IngressClassName = "contour" }) defer done() @@ -565,8 +566,8 @@ func TestIngressClassAnnotationUpdate(t *testing.T) { } func TestIngressClassResource_Configured(t *testing.T) { - rh, c, done := setup(t, func(reh *contour.EventHandler) { - reh.Builder.Source.IngressClassName = "testingressclass" + rh, c, done := setup(t, func(b *dag.Builder) { + b.Source.IngressClassName = "testingressclass" }) defer done() diff --git a/internal/featuretests/v3/rootnamespaces_test.go b/internal/featuretests/v3/rootnamespaces_test.go index b7661ea72bd..a1e1a01a6e6 100644 --- a/internal/featuretests/v3/rootnamespaces_test.go +++ b/internal/featuretests/v3/rootnamespaces_test.go @@ -19,7 +19,7 @@ import ( envoy_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1" - "github.com/projectcontour/contour/internal/contour" + "github.com/projectcontour/contour/internal/dag" envoy_v3 "github.com/projectcontour/contour/internal/envoy/v3" "github.com/projectcontour/contour/internal/fixture" v1 "k8s.io/api/core/v1" @@ -28,8 +28,8 @@ import ( ) func TestRootNamespaces(t *testing.T) { - rh, c, done := setup(t, func(reh *contour.EventHandler) { - reh.Builder.Source.RootNamespaces = []string{"roots"} + rh, c, done := setup(t, func(b *dag.Builder) { + b.Source.RootNamespaces = []string{"roots"} }) defer done() diff --git a/internal/featuretests/v3/route_test.go b/internal/featuretests/v3/route_test.go index 28d1bae315c..62ac04bea84 100644 --- a/internal/featuretests/v3/route_test.go +++ b/internal/featuretests/v3/route_test.go @@ -22,7 +22,6 @@ import ( envoy_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1" - "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/dag" envoy_v3 "github.com/projectcontour/contour/internal/envoy/v3" "github.com/projectcontour/contour/internal/featuretests" @@ -933,8 +932,8 @@ func TestDefaultBackendIsOverriddenByNoHostIngressRule(t *testing.T) { // in LDS or RDS, or even CDS, but this test mirrors the place it's // tested in internal/contour/route_test.go func TestRDSIngressClassAnnotation(t *testing.T) { - rh, c, done := setup(t, func(reh *contour.EventHandler) { - reh.Builder.Source.IngressClassName = "linkerd" + rh, c, done := setup(t, func(b *dag.Builder) { + b.Source.IngressClassName = "linkerd" }) defer done() @@ -1306,8 +1305,8 @@ func TestRouteWithTLS_InsecurePaths(t *testing.T) { } func TestRouteWithTLS_InsecurePaths_DisablePermitInsecureTrue(t *testing.T) { - rh, c, done := setup(t, func(reh *contour.EventHandler) { - reh.Builder.Processors = []dag.Processor{ + rh, c, done := setup(t, func(b *dag.Builder) { + b.Processors = []dag.Processor{ &dag.IngressProcessor{}, &dag.HTTPProxyProcessor{ DisablePermitInsecure: true, @@ -1678,8 +1677,8 @@ func TestHTTPProxyRouteWithTLS_InsecurePaths(t *testing.T) { } func TestHTTPProxyRouteWithTLS_InsecurePaths_DisablePermitInsecureTrue(t *testing.T) { - rh, c, done := setup(t, func(reh *contour.EventHandler) { - reh.Builder.Processors = []dag.Processor{ + rh, c, done := setup(t, func(b *dag.Builder) { + b.Processors = []dag.Processor{ &dag.IngressProcessor{}, &dag.HTTPProxyProcessor{ DisablePermitInsecure: true, diff --git a/internal/httpsvc/http.go b/internal/httpsvc/http.go index 527965a9b92..445d94b2cfc 100644 --- a/internal/httpsvc/http.go +++ b/internal/httpsvc/http.go @@ -43,9 +43,13 @@ type Service struct { http.ServeMux } -// Start fulfills the g.Start contract. -// When stop is closed the http server will shutdown. -func (svc *Service) Start(stop <-chan struct{}) (err error) { +func (svc *Service) NeedLeaderElection() bool { + return false +} + +// Implements controller-runtime Runnable interface. +// When context is done, http server will shutdown. +func (svc *Service) Start(ctx context.Context) (err error) { defer func() { if err != nil { svc.WithError(err).Error("terminated HTTP server with error") @@ -81,7 +85,7 @@ func (svc *Service) Start(stop <-chan struct{}) (err error) { go func() { // wait for stop signal from group. - <-stop + <-ctx.Done() // shutdown the server with 5 seconds grace. ctx := context.Background() diff --git a/internal/httpsvc/http_test.go b/internal/httpsvc/http_test.go index 753f9117cff..1d6bd601041 100644 --- a/internal/httpsvc/http_test.go +++ b/internal/httpsvc/http_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package httpsvc +package httpsvc_test import ( "context" @@ -25,13 +25,16 @@ import ( "time" "github.com/projectcontour/contour/internal/fixture" + "github.com/projectcontour/contour/internal/httpsvc" "github.com/projectcontour/contour/internal/workgroup" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/tsaarni/certyaml" + "sigs.k8s.io/controller-runtime/pkg/manager" ) func TestHTTPService(t *testing.T) { - svc := Service{ + svc := httpsvc.Service{ Addr: "localhost", Port: 8001, FieldLogger: fixture.NewTestLogger(t), @@ -42,7 +45,7 @@ func TestHTTPService(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) var wg workgroup.Group - wg.Add(svc.Start) + wg.AddContext(svc.Start) done := make(chan error) go func() { done <- wg.Run(ctx) @@ -95,7 +98,7 @@ func TestHTTPSService(t *testing.T) { checkFatalErr(t, err) defer os.RemoveAll(configDir) - svc := Service{ + svc := httpsvc.Service{ Addr: "localhost", Port: 8001, CABundle: filepath.Join(configDir, "ca.pem"), @@ -115,7 +118,7 @@ func TestHTTPSService(t *testing.T) { }) ctx, cancel := context.WithCancel(context.Background()) var wg workgroup.Group - wg.Add(svc.Start) + wg.AddContext(svc.Start) done := make(chan error) go func() { done <- wg.Run(ctx) @@ -181,3 +184,8 @@ func tryGet(url string, clientCert tls.Certificate, caCertPool *x509.CertPool) ( } return client.Get(url) } + +func TestServiceNotRequireLeaderElection(t *testing.T) { + var s manager.LeaderElectionRunnable = &httpsvc.Service{} + require.False(t, s.NeedLeaderElection()) +} diff --git a/internal/k8s/rbac.go b/internal/k8s/rbac.go index df5bf680996..3f94d001968 100644 --- a/internal/k8s/rbac.go +++ b/internal/k8s/rbac.go @@ -26,4 +26,5 @@ package k8s // +kubebuilder:rbac:groups="",resources=secrets;endpoints;services;namespaces,verbs=get;list;watch // Add RBAC policy to support leader election. -// +kubebuilder:rbac:groups="",resources=configmaps,verbs=create;get;update +// +kubebuilder:rbac:groups="",resources=configmaps;events,verbs=create;get;update +// +kubebuilder:rbac:groups="coordination.k8s.io",resources=leases,verbs=create;get;update diff --git a/internal/k8s/status.go b/internal/k8s/status.go index 540cce56ff0..0b1b33d4969 100644 --- a/internal/k8s/status.go +++ b/internal/k8s/status.go @@ -60,11 +60,17 @@ func (m StatusMutatorFunc) Mutate(old client.Object) client.Object { // StatusUpdateHandler holds the details required to actually write an Update back to the referenced object. type StatusUpdateHandler struct { - Log logrus.FieldLogger - Client client.Client - UpdateChannel chan StatusUpdate - LeaderElected chan struct{} - IsLeader bool + log logrus.FieldLogger + client client.Client + updateChannel chan StatusUpdate +} + +func NewStatusUpdateHandler(log logrus.FieldLogger, client client.Client) *StatusUpdateHandler { + return &StatusUpdateHandler{ + log: log, + client: client, + updateChannel: make(chan StatusUpdate, 100), + } } func (suh *StatusUpdateHandler) apply(upd StatusUpdate) { @@ -72,49 +78,43 @@ func (suh *StatusUpdateHandler) apply(upd StatusUpdate) { obj := upd.Resource // Get the resource. - if err := suh.Client.Get(context.Background(), upd.NamespacedName, obj); err != nil { + if err := suh.client.Get(context.Background(), upd.NamespacedName, obj); err != nil { return err } newObj := upd.Mutator.Mutate(obj) if isStatusEqual(obj, newObj) { - suh.Log.WithField("name", upd.NamespacedName.Name). + suh.log.WithField("name", upd.NamespacedName.Name). WithField("namespace", upd.NamespacedName.Namespace). Debug("update was a no-op") return nil } - return suh.Client.Status().Update(context.Background(), newObj) + return suh.client.Status().Update(context.Background(), newObj) }); err != nil { - suh.Log.WithError(err). + suh.log.WithError(err). WithField("name", upd.NamespacedName.Name). WithField("namespace", upd.NamespacedName.Namespace). Error("unable to update status") } } +func (suh *StatusUpdateHandler) NeedLeaderElection() bool { + return true +} + // Start runs the goroutine to perform status writes. -// Until the Contour is elected leader, will drop updates on the floor. -func (suh *StatusUpdateHandler) Start(stop <-chan struct{}) error { +func (suh *StatusUpdateHandler) Start(ctx context.Context) error { + suh.log.Info("started status update handler") + defer suh.log.Info("stopped status update handler") + for { select { - case <-stop: + case <-ctx.Done(): return nil - case <-suh.LeaderElected: - suh.Log.Info("elected leader") - suh.IsLeader = true - // disable this case - suh.LeaderElected = nil - case upd := <-suh.UpdateChannel: - if !suh.IsLeader { - suh.Log.WithField("name", upd.NamespacedName.Name). - WithField("namespace", upd.NamespacedName.Namespace). - Debug("not leader, not applying update") - continue - } - - suh.Log.WithField("name", upd.NamespacedName.Name). + case upd := <-suh.updateChannel: + suh.log.WithField("name", upd.NamespacedName.Name). WithField("namespace", upd.NamespacedName.Namespace). Debug("received a status update") @@ -127,12 +127,8 @@ func (suh *StatusUpdateHandler) Start(stop <-chan struct{}) error { // Writer retrieves the interface that should be used to write to the StatusUpdateHandler. func (suh *StatusUpdateHandler) Writer() StatusUpdater { - if suh.UpdateChannel == nil { - suh.UpdateChannel = make(chan StatusUpdate, 100) - } - return &StatusUpdateWriter{ - UpdateChannel: suh.UpdateChannel, + updateChannel: suh.updateChannel, } } @@ -143,10 +139,10 @@ type StatusUpdater interface { // StatusUpdateWriter takes status updates and sends these to the StatusUpdateHandler via a channel. type StatusUpdateWriter struct { - UpdateChannel chan StatusUpdate + updateChannel chan<- StatusUpdate } // Send sends the given StatusUpdate off to the update channel for writing by the StatusUpdateHandler. func (suw *StatusUpdateWriter) Send(update StatusUpdate) { - suw.UpdateChannel <- update + suw.updateChannel <- update } diff --git a/internal/k8s/status_test.go b/internal/k8s/status_test.go new file mode 100644 index 00000000000..87b424d646e --- /dev/null +++ b/internal/k8s/status_test.go @@ -0,0 +1,27 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8s_test + +import ( + "testing" + + "github.com/projectcontour/contour/internal/k8s" + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +func TestStatusUpdateHandlerRequiresLeaderElection(t *testing.T) { + var s manager.LeaderElectionRunnable = &k8s.StatusUpdateHandler{} + require.True(t, s.NeedLeaderElection()) +} diff --git a/internal/leadership/mocks/need_leader_election_notification.go b/internal/leadership/mocks/need_leader_election_notification.go new file mode 100644 index 00000000000..17d475c57c9 --- /dev/null +++ b/internal/leadership/mocks/need_leader_election_notification.go @@ -0,0 +1,15 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// NeedLeaderElectionNotification is an autogenerated mock type for the NeedLeaderElectionNotification type +type NeedLeaderElectionNotification struct { + mock.Mock +} + +// OnElectedLeader provides a mock function with given fields: +func (_m *NeedLeaderElectionNotification) OnElectedLeader() { + _m.Called() +} diff --git a/internal/leadership/notifier.go b/internal/leadership/notifier.go new file mode 100644 index 00000000000..dd993070760 --- /dev/null +++ b/internal/leadership/notifier.go @@ -0,0 +1,39 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leadership + +import "context" + +type NeedLeaderElectionNotification interface { + OnElectedLeader() +} + +// Notifier is controller-runtime manager runnable that can be used to +// notify other components when leader election has occurred for the current +// manager. +type Notifier struct { + ToNotify []NeedLeaderElectionNotification +} + +func (n *Notifier) NeedLeaderElection() bool { + return true +} + +func (n *Notifier) Start(ctx context.Context) error { + for _, t := range n.ToNotify { + go t.OnElectedLeader() + } + <-ctx.Done() + return nil +} diff --git a/internal/leadership/notifier_test.go b/internal/leadership/notifier_test.go new file mode 100644 index 00000000000..c6c9ce655ad --- /dev/null +++ b/internal/leadership/notifier_test.go @@ -0,0 +1,72 @@ +// Copyright Project Contour Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leadership_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/projectcontour/contour/internal/leadership" + "github.com/projectcontour/contour/internal/leadership/mocks" + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +//go:generate go run github.com/vektra/mockery/v2 --case=snake --name=NeedLeaderElectionNotification + +func TestNotifier(t *testing.T) { + toNotify1 := &mocks.NeedLeaderElectionNotification{} + toNotify1.On("OnElectedLeader").Once() + toNotify2 := &mocks.NeedLeaderElectionNotification{} + toNotify2.On("OnElectedLeader").Once() + + notifier := &leadership.Notifier{ + ToNotify: []leadership.NeedLeaderElectionNotification{ + toNotify1, + toNotify2, + }, + } + + wg := new(sync.WaitGroup) + ctx, cancel := context.WithCancel(context.Background()) + + wg.Add(1) + go func() { + defer wg.Done() + _ = notifier.Start(ctx) + }() + + // Assert we don't return until cancel + require.Never(t, func() bool { + wg.Wait() + return true + }, time.Second, time.Millisecond*10) + + require.True(t, toNotify1.AssertExpectations(t)) + require.True(t, toNotify2.AssertExpectations(t)) + + cancel() + + require.Eventually(t, func() bool { + wg.Wait() + return true + }, time.Second, time.Millisecond*10) +} + +func TestNotifierRequiresLeaderElection(t *testing.T) { + var notifier manager.LeaderElectionRunnable = &leadership.Notifier{} + require.True(t, notifier.NeedLeaderElection()) +} diff --git a/internal/xdscache/v3/server_test.go b/internal/xdscache/v3/server_test.go index 0bda7415e57..8c080842223 100644 --- a/internal/xdscache/v3/server_test.go +++ b/internal/xdscache/v3/server_test.go @@ -200,25 +200,25 @@ func TestGRPC(t *testing.T) { et, } - eh = &contour.EventHandler{ - Observer: dag.ComposeObservers(xdscache.ObserversOf(resources)...), - FieldLogger: log, - } + eh = contour.NewEventHandler(contour.EventHandlerConfig{ + Logger: log, + Builder: new(dag.Builder), + Observer: dag.ComposeObservers(xdscache.ObserversOf(resources)...), + }) srv := xds.NewServer(nil) contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(resources)...), srv) l, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) done := make(chan error, 1) - stop := make(chan struct{}) - run := eh.Start() - go run(stop) // nolint:errcheck + ctx, cancel := context.WithCancel(context.Background()) + go eh.Start(ctx) // nolint:errcheck go func() { done <- srv.Serve(l) }() defer func() { srv.GracefulStop() - close(stop) + cancel() <-done }() cc, err := grpc.Dial(l.Addr().String(), grpc.WithInsecure()) diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index 0df9c2da75c..e81f94fd259 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -37,6 +37,7 @@ import ( "gopkg.in/yaml.v2" apps_v1 "k8s.io/api/apps/v1" batch_v1 "k8s.io/api/batch/v1" + coordinationv1 "k8s.io/api/coordination/v1" v1 "k8s.io/api/core/v1" rbac_v1 "k8s.io/api/rbac/v1" apiextensions_v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -547,7 +548,7 @@ func (d *Deployment) DeleteResourcesForLocalContour() error { d.HTTPProxyCRD, d.EnvoyServiceAccount, } { - if err := d.ensureDeleted(r); err != nil { + if err := d.EnsureDeleted(r); err != nil { return err } } @@ -669,9 +670,9 @@ func (d *Deployment) StopLocalContour(contourCmd *gexec.Session, configFile stri // - Contour cluster role // - Contour service // - Envoy service -// - Contour deployment +// - Contour deployment (only started if bool passed in is true) // - Envoy DaemonSet -func (d *Deployment) EnsureResourcesForInclusterContour() error { +func (d *Deployment) EnsureResourcesForInclusterContour(startContourDeployment bool) error { fmt.Fprintf(d.cmdOutputWriter, "Deploying Contour with image: %s\n", d.contourImage) if err := d.EnsureNamespace(); err != nil { @@ -737,8 +738,10 @@ func (d *Deployment) EnsureResourcesForInclusterContour() error { } d.ContourDeployment.Spec.Template.Spec.Containers[0].Image = d.contourImage d.ContourDeployment.Spec.Template.Spec.Containers[0].ImagePullPolicy = v1.PullIfNotPresent - if err := d.EnsureContourDeployment(); err != nil { - return err + if startContourDeployment { + if err := d.EnsureContourDeployment(); err != nil { + return err + } } // Update container image. if l := len(d.EnvoyDaemonSet.Spec.Template.Spec.InitContainers); l != 1 { @@ -770,10 +773,17 @@ func (d *Deployment) DeleteResourcesForInclusterContour() error { Namespace: d.Namespace.Name, }, } + leaderElectionLease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: "leader-elect", + Namespace: d.Namespace.Name, + }, + } for _, r := range []client.Object{ d.EnvoyDaemonSet, d.ContourDeployment, + leaderElectionLease, leaderElectionConfigMap, d.EnvoyService, d.ContourService, @@ -790,7 +800,7 @@ func (d *Deployment) DeleteResourcesForInclusterContour() error { d.EnvoyServiceAccount, d.ContourServiceAccount, } { - if err := d.ensureDeleted(r); err != nil { + if err := d.EnsureDeleted(r); err != nil { return err } } @@ -798,7 +808,7 @@ func (d *Deployment) DeleteResourcesForInclusterContour() error { return nil } -func (d *Deployment) ensureDeleted(obj client.Object) error { +func (d *Deployment) EnsureDeleted(obj client.Object) error { // Delete the object; if it already doesn't exist, // then we're done. err := d.client.Delete(context.Background(), obj) @@ -820,6 +830,9 @@ func (d *Deployment) ensureDeleted(obj client.Object) error { return fmt.Errorf("error waiting for deletion of resource %T %s/%s: %v", obj, obj.GetNamespace(), obj.GetName(), err) } + // Clear out resource version to ensure object can be used again. + obj.SetResourceVersion("") + return nil } diff --git a/test/e2e/incluster/incluster_test.go b/test/e2e/incluster/incluster_test.go index 58a8bb8734a..52d7acbd9ae 100644 --- a/test/e2e/incluster/incluster_test.go +++ b/test/e2e/incluster/incluster_test.go @@ -33,7 +33,7 @@ func TestIncluster(t *testing.T) { } var _ = BeforeSuite(func() { - require.NoError(f.T(), f.Deployment.EnsureResourcesForInclusterContour()) + require.NoError(f.T(), f.Deployment.EnsureResourcesForInclusterContour(false)) }) var _ = AfterSuite(func() { @@ -44,11 +44,19 @@ var _ = AfterSuite(func() { }) var _ = Describe("Incluster", func() { - BeforeEach(func() { + JustBeforeEach(func() { + // Create contour deployment here so we can modify or do other + // actions in BeforeEach. + require.NoError(f.T(), f.Deployment.EnsureContourDeployment()) require.NoError(f.T(), f.Deployment.WaitForContourDeploymentUpdated()) require.NoError(f.T(), f.Deployment.WaitForEnvoyDaemonSetUpdated()) }) + AfterEach(func() { + // Clean out contour after each test. + require.NoError(f.T(), f.Deployment.EnsureDeleted(f.Deployment.ContourDeployment)) + }) + f.NamespacedTest("smoke-test", testSimpleSmoke) f.NamespacedTest("leader-election", testLeaderElection) diff --git a/test/e2e/incluster/leaderelection_test.go b/test/e2e/incluster/leaderelection_test.go index cc9e5b3ffe1..a5519e2e0c1 100644 --- a/test/e2e/incluster/leaderelection_test.go +++ b/test/e2e/incluster/leaderelection_test.go @@ -26,8 +26,10 @@ import ( . "github.com/onsi/ginkgo" "github.com/stretchr/testify/require" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -39,7 +41,7 @@ func testLeaderElection(namespace string) { // unit tests as it is difficult to observe e.g. which contour instance // has set status on an object. Specify("leader election resources are created as expected", func() { - getLeaderPodName := func() (string, error) { + getLeaderID := func() (string, error) { type leaderInfo struct { HolderIdentity string } @@ -50,11 +52,20 @@ func testLeaderElection(namespace string) { Namespace: f.Deployment.Namespace.Name, }, } - if err := f.Client.Get(context.TODO(), client.ObjectKeyFromObject(leaderElectionConfigMap), leaderElectionConfigMap); err != nil { return "", err } + leaderElectionLease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: "leader-elect", + Namespace: f.Deployment.Namespace.Name, + }, + } + if err := f.Client.Get(context.TODO(), client.ObjectKeyFromObject(leaderElectionLease), leaderElectionLease); err != nil { + return "", err + } + var ( infoRaw string found bool @@ -67,30 +78,79 @@ func testLeaderElection(namespace string) { if err := json.Unmarshal([]byte(infoRaw), &li); err != nil { return "", err } + leaseHolder := pointer.StringDeref(leaderElectionLease.Spec.HolderIdentity, "") + if leaseHolder != li.HolderIdentity { + return "", fmt.Errorf("lease leader %q and configmap leader %q do not match", leaseHolder, li.HolderIdentity) + } if !strings.HasPrefix(li.HolderIdentity, "contour-") { return "", fmt.Errorf("invalid leader name: %q", li.HolderIdentity) } return li.HolderIdentity, nil } - originalLeader, err := getLeaderPodName() - require.NoError(f.T(), err) + podNameFromLeaderID := func(id string) string { + require.Greater(f.T(), len(id), 37) + return id[:len(id)-37] + } + + var originalLeader string + require.Eventually(f.T(), func() bool { + var err error + originalLeader, err = getLeaderID() + return err == nil + }, 2*time.Minute, f.RetryInterval) + + events := &corev1.EventList{} + listOptions := &client.ListOptions{ + Namespace: f.Deployment.Namespace.Name, + } + require.NoError(f.T(), f.Client.List(context.TODO(), events, listOptions)) + foundEvents := map[string]struct{}{} + for _, e := range events.Items { + if e.Reason == "LeaderElection" && e.Source.Component == originalLeader { + foundEvents[e.InvolvedObject.Kind] = struct{}{} + } + } + require.Contains(f.T(), foundEvents, "Lease") + require.Contains(f.T(), foundEvents, "ConfigMap") // Delete contour leader pod. leaderPod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: originalLeader, + // Chop off _UUID suffix + Name: podNameFromLeaderID(originalLeader), Namespace: f.Deployment.Namespace.Name, }, } require.NoError(f.T(), f.Client.Delete(context.TODO(), leaderPod)) + newLeader := originalLeader require.Eventually(f.T(), func() bool { - leader, err := getLeaderPodName() + var err error + newLeader, err = getLeaderID() if err != nil { return false } - return leader != originalLeader + return newLeader != originalLeader }, 2*time.Minute, f.RetryInterval) + + require.NoError(f.T(), f.Client.List(context.TODO(), events, listOptions)) + foundEvents = map[string]struct{}{} + for _, e := range events.Items { + if e.Reason == "LeaderElection" && e.Source.Component == newLeader { + foundEvents[e.InvolvedObject.Kind] = struct{}{} + } + } + require.Contains(f.T(), foundEvents, "Lease") + require.Contains(f.T(), foundEvents, "ConfigMap") + + // Check leader pod exists. + leaderPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podNameFromLeaderID(newLeader), + Namespace: f.Deployment.Namespace.Name, + }, + } + require.NoError(f.T(), f.Client.Get(context.TODO(), client.ObjectKeyFromObject(leaderPod), leaderPod)) }) } diff --git a/test/e2e/incluster/rbac_test.go b/test/e2e/incluster/rbac_test.go index ac7c21b5c0d..4ba1224ea0c 100644 --- a/test/e2e/incluster/rbac_test.go +++ b/test/e2e/incluster/rbac_test.go @@ -176,6 +176,7 @@ func testIngressResourceRBAC(namespace string) { Path: "/", Condition: e2e.HasStatusCode(200), }) + require.NotNil(f.T(), res, "request never succeeded") require.Truef(f.T(), ok, "expected 200 response code, got %d", res.StatusCode) }) } diff --git a/test/e2e/upgrade/upgrade_test.go b/test/e2e/upgrade/upgrade_test.go index 3cc052213a1..7a6ae39f9d2 100644 --- a/test/e2e/upgrade/upgrade_test.go +++ b/test/e2e/upgrade/upgrade_test.go @@ -101,7 +101,7 @@ var _ = Describe("upgrading Contour", func() { require.NoError(f.T(), err) By("deploying updated contour resources") - require.NoError(f.T(), f.Deployment.EnsureResourcesForInclusterContour()) + require.NoError(f.T(), f.Deployment.EnsureResourcesForInclusterContour(true)) By("waiting for contour deployment to be updated") require.NoError(f.T(), f.Deployment.WaitForContourDeploymentUpdated()) diff --git a/tools.go b/tools.go index 4480a4b5796..31a701a2c89 100644 --- a/tools.go +++ b/tools.go @@ -5,7 +5,7 @@ package tools import ( _ "github.com/ahmetb/gen-crd-api-reference-docs" - + _ "github.com/vektra/mockery/v2" _ "sigs.k8s.io/controller-tools/cmd/controller-gen" _ "sigs.k8s.io/kustomize/kyaml" )