From d90547f23b243dbbf26e70030dcb31bac54c604f Mon Sep 17 00:00:00 2001 From: DanStough Date: Tue, 12 Mar 2024 16:36:46 -0400 Subject: [PATCH 1/2] fix(control-plane): acl tokens deleted while pods in graceful shutdown --- .changelog/3736.txt | 4 + acceptance/framework/consul/helm_cluster.go | 4 + .../framework/consul/helm_cluster_test.go | 9 +- .../endpoints/endpoints_controller.go | 185 ++++++++++++++---- .../endpoints/endpoints_controller_test.go | 103 +++++++++- .../subcommand/inject-connect/command.go | 1 + 6 files changed, 250 insertions(+), 56 deletions(-) create mode 100644 .changelog/3736.txt diff --git a/.changelog/3736.txt b/.changelog/3736.txt new file mode 100644 index 0000000000..1b86f858f4 --- /dev/null +++ b/.changelog/3736.txt @@ -0,0 +1,4 @@ +```release-note:bug +control-plane: fix an issue where ACL token cleanup did not respect a pod's GracefulShutdownPeriodSeconds and +tokens were invalidated immediately on pod entering Terminating state. +``` \ No newline at end of file diff --git a/acceptance/framework/consul/helm_cluster.go b/acceptance/framework/consul/helm_cluster.go index bdd0c9bb08..ee5476be88 100644 --- a/acceptance/framework/consul/helm_cluster.go +++ b/acceptance/framework/consul/helm_cluster.go @@ -624,6 +624,10 @@ func defaultValues() map[string]string { // (false positive). "dns.enabled": "false", + // Adjust the default value from 30s to 1s since we have several tests that verify tokens are cleaned up, + // and many of them are using the default retryer (7s max). + "connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "1", + // Enable trace logs for servers and clients. "server.extraConfig": `"{\"log_level\": \"TRACE\"}"`, "client.extraConfig": `"{\"log_level\": \"TRACE\"}"`, diff --git a/acceptance/framework/consul/helm_cluster_test.go b/acceptance/framework/consul/helm_cluster_test.go index 732ecf2e7e..d4ae11a655 100644 --- a/acceptance/framework/consul/helm_cluster_test.go +++ b/acceptance/framework/consul/helm_cluster_test.go @@ -30,7 +30,8 @@ func TestNewHelmCluster(t *testing.T) { "global.image": "test-config-image", "global.logLevel": "debug", "server.replicas": "1", - "connectInject.transparentProxy.defaultEnabled": "false", + "connectInject.transparentProxy.defaultEnabled": "false", + "connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "1", "dns.enabled": "false", "server.extraConfig": `"{\"log_level\": \"TRACE\"}"`, "client.extraConfig": `"{\"log_level\": \"TRACE\"}"`, @@ -43,7 +44,8 @@ func TestNewHelmCluster(t *testing.T) { "global.logLevel": "debug", "server.bootstrapExpect": "3", "server.replicas": "3", - "connectInject.transparentProxy.defaultEnabled": "true", + "connectInject.transparentProxy.defaultEnabled": "true", + "connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "3", "dns.enabled": "true", "server.extraConfig": `"{\"foo\": \"bar\"}"`, "client.extraConfig": `"{\"foo\": \"bar\"}"`, @@ -54,7 +56,8 @@ func TestNewHelmCluster(t *testing.T) { "global.logLevel": "debug", "server.bootstrapExpect": "3", "server.replicas": "3", - "connectInject.transparentProxy.defaultEnabled": "true", + "connectInject.transparentProxy.defaultEnabled": "true", + "connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "3", "dns.enabled": "true", "server.extraConfig": `"{\"foo\": \"bar\"}"`, "client.extraConfig": `"{\"foo\": \"bar\"}"`, diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go index 81b1674abc..0af8aa1e44 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go @@ -7,15 +7,18 @@ import ( "context" "encoding/json" "fmt" + "math" "net" "regexp" "strconv" "strings" + "time" mapset "github.com/deckarep/golang-set" "github.com/go-logr/logr" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/common" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/lifecycle" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics" "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul-k8s/control-plane/helper/parsetags" @@ -101,6 +104,8 @@ type Controller struct { // any created Consul namespaces to allow cross namespace service discovery. // Only necessary if ACLs are enabled. CrossNSACLPolicy string + // Lifecycle config set graceful startup/shutdown defaults for pods. + LifecycleConfig lifecycle.Config // ReleaseName is the Consul Helm installation release. ReleaseName string // ReleaseNamespace is the namespace where Consul is installed. @@ -160,18 +165,14 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } err = r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints) - // endpointPods holds a set of all pods this endpoints object is currently pointing to. - // We use this later when we reconcile ACL tokens to decide whether an ACL token in Consul - // is for a pod that no longer exists. - endpointPods := mapset.NewSet() // If the endpoints object has been deleted (and we get an IsNotFound // error), we need to deregister all instances in Consul for that service. if k8serrors.IsNotFound(err) { // Deregister all instances in Consul for this service. The function deregisterService handles // the case where the Consul service name is different from the Kubernetes service name. - err = r.deregisterService(apiClient, req.Name, req.Namespace, nil) - return ctrl.Result{}, err + requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil) + return ctrl.Result{RequeueAfter: requeueAfter}, err } else if err != nil { r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace) return ctrl.Result{}, err @@ -184,8 +185,8 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if isLabeledIgnore(serviceEndpoints.Labels) { // We always deregister the service to handle the case where a user has registered the service, then added the label later. r.Log.Info("ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace) - err = r.deregisterService(apiClient, req.Name, req.Namespace, nil) - return ctrl.Result{}, err + requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil) + return ctrl.Result{RequeueAfter: requeueAfter}, err } // endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare @@ -220,12 +221,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } if hasBeenInjected(pod) { - endpointPods.Add(address.TargetRef.Name) if isConsulDataplaneSupported(pod) { - if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil { + if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus); err != nil { r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } + // Build the endpointAddressMap up for deregistering service instances later. + endpointAddressMap[pod.Status.PodIP] = true } else { r.Log.Info("detected an update to pre-consul-dataplane service", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) nodeAgentClientCfg, err := r.consulClientCfgForNodeAgent(apiClient, pod, serverState) @@ -248,11 +250,12 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } if isGateway(pod) { - endpointPods.Add(address.TargetRef.Name) - if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil { + if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus); err != nil { r.Log.Error(err, "failed to register gateway or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } + // Build the endpointAddressMap up for deregistering service instances later. + endpointAddressMap[pod.Status.PodIP] = true } } } @@ -261,12 +264,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister // from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during // the registration codepath. - if err = r.deregisterService(apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil { + requeueAfter, err := r.deregisterService(ctx, apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap) + if err != nil { r.Log.Error(err, "failed to deregister endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) errs = multierror.Append(errs, err) } - return ctrl.Result{}, errs + return ctrl.Result{RequeueAfter: requeueAfter}, errs } func (r *Controller) Logger(name types.NamespacedName) logr.Logger { @@ -281,10 +285,7 @@ func (r *Controller) SetupWithManager(mgr ctrl.Manager) error { // registerServicesAndHealthCheck creates Consul registrations for the service and proxy and registers them with Consul. // It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready. -func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error { - // Build the endpointAddressMap up for deregistering service instances later. - endpointAddressMap[pod.Status.PodIP] = true - +func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error { var managedByEndpointsController bool if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue { managedByEndpointsController = true @@ -320,10 +321,7 @@ func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod c // registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul. // It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready. -func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error { - // Build the endpointAddressMap up for deregistering service instances later. - endpointAddressMap[pod.Status.PodIP] = true - +func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error { var managedByEndpointsController bool if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue { managedByEndpointsController = true @@ -909,40 +907,54 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string) // The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister // them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map // has addresses, it will only deregister instances not in the map. -func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error { +// If the pod backing a Consul service instance still exists and the graceful shutdown lifecycle mode is enabled, the instance +// will not be deregistered. Instead, its health check will be updated to Critical in order to drain incoming traffic and +// this function will return a requeueAfter duration. This can be used to requeue the event at the longest shutdown time +// interval to clean up these instances after they have exited. +func (r *Controller) deregisterService( + ctx context.Context, + apiClient *api.Client, + k8sSvcName string, + k8sSvcNamespace string, + endpointsAddressesMap map[string]bool) (time.Duration, error) { + // Get services matching metadata from Consul serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace) if err != nil { r.Log.Error(err, "failed to get service instances", "name", k8sSvcName) - return err + return 0, err } var errs error + var requeueAfter time.Duration for _, svc := range serviceInstances { // We need to get services matching "k8s-service-name" and "k8s-namespace" metadata. // If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister // every service instance. var serviceDeregistered bool - if endpointsAddressesMap != nil { - if _, ok := endpointsAddressesMap[svc.ServiceAddress]; !ok { - // If the service address is not in the Endpoints addresses, deregister it. - r.Log.Info("deregistering service from consul", "svc", svc.ServiceID) - _, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{ - Node: svc.Node, - ServiceID: svc.ServiceID, - Namespace: svc.Namespace, - }, nil) - if err != nil { - // Do not exit right away as there might be other services that need to be deregistered. - r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID) - errs = multierror.Append(errs, err) - } else { - serviceDeregistered = true - } + + if addressIsMissingFromEndpointsMap(svc.ServiceAddress, endpointsAddressesMap) { + // If graceful shutdown is enabled, continue to the next service instance and + // mark that an event requeue is needed. We should requeue at the longest time interval + // to prevent excessive re-queues. Also, updating the health status in Consul to Critical + // should prevent routing during gracefulShutdown. + podShutdownDuration, err := r.getGracefulShutdownAndUpdatePodCheck(ctx, apiClient, svc, k8sSvcNamespace) + if err != nil { + r.Log.Error(err, "failed to get pod shutdown duration", "svc", svc.ServiceName) + errs = multierror.Append(errs, err) } - } else { + + // set requeue response, then continue to the next service instance + if podShutdownDuration > requeueAfter { + requeueAfter = podShutdownDuration + } + if podShutdownDuration > 0 { + continue + } + + // If the service address is not in the Endpoints addresses, deregister it. r.Log.Info("deregistering service from consul", "svc", svc.ServiceID) - _, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{ + _, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{ Node: svc.Node, ServiceID: svc.ServiceID, Namespace: svc.Namespace, @@ -974,8 +986,87 @@ func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvc } } - return errs + if requeueAfter > 0 { + r.Log.Info("re-queueing event for graceful shutdown", "name", k8sSvcName, "k8sNamespace", k8sSvcNamespace, "requeueAfter", requeueAfter) + } + + return requeueAfter, errs +} + +// getGracefulShutdownAndUpdatePodCheck checks if the pod is in the process of being terminated and if so, updates the +// health status of the service to critical. It returns the duration for which the pod should be re-queued (which is the pods +// gracefulShutdownPeriod setting). +func (r *Controller) getGracefulShutdownAndUpdatePodCheck(ctx context.Context, apiClient *api.Client, svc *api.CatalogService, k8sNamespace string) (time.Duration, error) { + // Get the pod, and check if it is still running. We do this to defer ACL/node cleanup for pods that are + // in graceful termination + podName := svc.ServiceMeta[constants.MetaKeyPodName] + if podName == "" { + return 0, nil + } + + var pod corev1.Pod + err := r.Client.Get(ctx, types.NamespacedName{Name: podName, Namespace: k8sNamespace}, &pod) + if k8serrors.IsNotFound(err) { + return 0, nil + } + if err != nil { + r.Log.Error(err, "failed to get terminating pod", "name", podName, "k8sNamespace", k8sNamespace) + return 0, fmt.Errorf("failed to get terminating pod %s/%s: %w", k8sNamespace, podName, err) + } + + shutdownSeconds, err := r.getGracefulShutdownPeriodSecondsForPod(pod) + if err != nil { + r.Log.Error(err, "failed to get graceful shutdown period for pod", "name", pod, "k8sNamespace", k8sNamespace) + return 0, fmt.Errorf("failed to get graceful shutdown period for pod %s/%s: %w", k8sNamespace, podName, err) + } + + if shutdownSeconds > 0 { + // Update the health status of the service to critical so that we can drain inbound traffic. + // We don't need to handle the proxy service since that will be reconciled looping through all the service instances. + serviceRegistration := &api.CatalogRegistration{ + Node: common.ConsulNodeNameFromK8sNode(pod.Spec.NodeName), + Address: pod.Status.HostIP, + // Service is nil since we are patching the health status + Check: &api.AgentCheck{ + CheckID: consulHealthCheckID(pod.Namespace, svc.ServiceID), + Name: consulKubernetesCheckName, + Type: consulKubernetesCheckType, + Status: api.HealthCritical, + ServiceID: svc.ServiceID, + Output: fmt.Sprintf("Pod \"%s/%s\" is terminating", pod.Namespace, podName), + Namespace: r.consulNamespace(pod.Namespace), + }, + SkipNodeUpdate: true, + } + + r.Log.Info("updating health status of service with Consul to critical in order to drain inbound traffic", "name", svc.ServiceName, + "id", svc.ServiceID, "pod", podName, "k8sNamespace", pod.Namespace) + _, err = apiClient.Catalog().Register(serviceRegistration, nil) + if err != nil { + r.Log.Error(err, "failed to update service health status to critical", "name", svc.ServiceName, "pod", podName) + return 0, fmt.Errorf("failed to update service health status for pod %s/%s to critical: %w", pod.Namespace, podName, err) + } + + // Return the duration for which the pod should be re-queued. We add 20% to the shutdownSeconds to account for + // any potential delay in the pod killed. + return time.Duration(shutdownSeconds+int(math.Ceil(float64(shutdownSeconds)*0.2))) * time.Second, nil + } + return 0, nil +} + +// getGracefulShutdownPeriodSecondsForPod returns the graceful shutdown period for the pod. If one is not specified, +// either through the controller configuration or pod annotations, it returns 0. +func (r *Controller) getGracefulShutdownPeriodSecondsForPod(pod corev1.Pod) (int, error) { + enabled, err := r.LifecycleConfig.EnableProxyLifecycle(pod) + if err != nil { + return 0, fmt.Errorf("failed to get parse proxy lifecycle configuration for pod %s/%s: %w", pod.Namespace, pod.Name, err) + } + // Check that SidecarProxyLifecycle is enabled. + if !enabled { + return 0, nil + } + return r.LifecycleConfig.ShutdownGracePeriodSeconds(pod) } // deregisterNode removes a node if it does not have any associated services attached to it. @@ -1502,3 +1593,11 @@ func getMultiPortIdx(pod corev1.Pod, serviceEndpoints corev1.Endpoints) int { } return -1 } + +func addressIsMissingFromEndpointsMap(address string, endpointsAddressesMap map[string]bool) bool { + if endpointsAddressesMap == nil { + return true + } + _, ok := endpointsAddressesMap[address] + return !ok +} diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go index b837d0f99b..2e531dae15 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "testing" + "time" mapset "github.com/deckarep/golang-set" logrtest "github.com/go-logr/logr/testr" @@ -3904,12 +3905,16 @@ func TestReconcileUpdateEndpoint_LegacyService(t *testing.T) { func TestReconcileDeleteEndpoint(t *testing.T) { t.Parallel() cases := []struct { - name string - consulSvcName string - consulPodUid string - expectServicesToBeDeleted bool - initialConsulSvcs []*api.AgentService - enableACLs bool + name string + consulSvcName string + pod *corev1.Pod // If this is present, a pod will be created in the fake kube client + consulPodUid string + expectServicesToBeDeleted bool + expectServicesToBeCritical bool + initialConsulSvcs []*api.AgentService + enableACLs bool + expectTokens bool + requeueAfter time.Duration }{ { name: "Legacy service: does not delete", @@ -4031,6 +4036,66 @@ func TestReconcileDeleteEndpoint(t *testing.T) { }, enableACLs: true, }, + { + name: "When graceful shutdown is enabled with ACLs, tokens should not be deleted", + consulSvcName: "service-deleted", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + UID: "123", + Annotations: map[string]string{ + constants.AnnotationEnableSidecarProxyLifecycle: "true", + constants.AnnotationSidecarProxyLifecycleShutdownGracePeriodSeconds: "5", + }, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + // We don't need any other fields for this test + }, + }, + consulPodUid: "123", + expectServicesToBeDeleted: false, + expectServicesToBeCritical: true, + initialConsulSvcs: []*api.AgentService{ + { + ID: "pod1-service-deleted", + Service: "service-deleted", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{ + metaKeyKubeServiceName: "service-deleted", + constants.MetaKeyKubeNS: "default", + metaKeyManagedBy: constants.ManagedByValue, + metaKeySyntheticNode: "true", + constants.MetaKeyPodName: "pod1", + constants.MetaKeyPodUID: "123", + }, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-deleted-sidecar-proxy", + Service: "service-deleted-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-deleted", + DestinationServiceID: "pod1-service-deleted", + }, + Meta: map[string]string{ + metaKeyKubeServiceName: "service-deleted", + constants.MetaKeyKubeNS: "default", + metaKeyManagedBy: constants.ManagedByValue, + metaKeySyntheticNode: "true", + constants.MetaKeyPodName: "pod1", + constants.MetaKeyPodUID: "123", + }, + }, + }, + requeueAfter: time.Duration(6) * time.Second, + expectTokens: true, + enableACLs: true, + }, { name: "Mesh Gateway", consulSvcName: "service-deleted", @@ -4210,10 +4275,16 @@ func TestReconcileDeleteEndpoint(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { // Add the default namespace. - ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} - node := corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + objs := []runtime.Object{ns, node} + + if tt.pod != nil { + objs = append(objs, tt.pod) + } + // Create fake k8s client. - fakeClient := fake.NewClientBuilder().WithRuntimeObjects(&ns, &node).Build() + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(objs...).Build() // Create test consulServer server adminToken := "123e4567-e89b-12d3-a456-426614174000" @@ -4286,6 +4357,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) { }) require.NoError(t, err) require.False(t, resp.Requeue) + require.Equal(t, tt.requeueAfter, resp.RequeueAfter) // After reconciliation, Consul should not have any instances of service-deleted serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil) @@ -4301,10 +4373,21 @@ func TestReconcileDeleteEndpoint(t *testing.T) { require.NotEmpty(t, serviceInstances) } - if tt.enableACLs { + if tt.expectServicesToBeCritical { + checks, _, err := consulClient.Health().Checks(tt.consulSvcName, nil) + require.NoError(t, err) + require.Equal(t, api.HealthCritical, checks.AggregatedStatus()) + } + + if tt.enableACLs && !tt.expectTokens { _, _, err = consulClient.ACL().TokenRead(token.AccessorID, nil) + require.Error(t, err) require.Contains(t, err.Error(), "ACL not found") } + if tt.expectTokens { + _, _, err = consulClient.ACL().TokenRead(token.AccessorID, nil) + require.NoError(t, err) + } }) } } diff --git a/control-plane/subcommand/inject-connect/command.go b/control-plane/subcommand/inject-connect/command.go index d8e740d3dc..4a9572a087 100644 --- a/control-plane/subcommand/inject-connect/command.go +++ b/control-plane/subcommand/inject-connect/command.go @@ -466,6 +466,7 @@ func (c *Command) Run(args []string) int { EnableNSMirroring: c.flagEnableK8SNSMirroring, NSMirroringPrefix: c.flagK8SNSMirroringPrefix, CrossNSACLPolicy: c.flagCrossNamespaceACLPolicy, + LifecycleConfig: lifecycleConfig, EnableTransparentProxy: c.flagDefaultEnableTransparentProxy, EnableWANFederation: c.flagEnableFederation, TProxyOverwriteProbes: c.flagTransparentProxyDefaultOverwriteProbes, From 870868ab8933f1eac0acfbe4fbc4c659f47e17db Mon Sep 17 00:00:00 2001 From: DanStough Date: Tue, 19 Mar 2024 10:51:15 -0400 Subject: [PATCH 2/2] test: fix TestConnectInject_ProxyLifecycleShutdown --- .../connect/connect_proxy_lifecycle_test.go | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/acceptance/tests/connect/connect_proxy_lifecycle_test.go b/acceptance/tests/connect/connect_proxy_lifecycle_test.go index ae70a0fdeb..7a4dae55a7 100644 --- a/acceptance/tests/connect/connect_proxy_lifecycle_test.go +++ b/acceptance/tests/connect/connect_proxy_lifecycle_test.go @@ -12,13 +12,14 @@ import ( "time" "github.com/gruntwork-io/terratest/modules/k8s" + "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/hashicorp/consul-k8s/acceptance/framework/connhelper" "github.com/hashicorp/consul-k8s/acceptance/framework/consul" "github.com/hashicorp/consul-k8s/acceptance/framework/helpers" "github.com/hashicorp/consul-k8s/acceptance/framework/logger" - "github.com/hashicorp/consul/sdk/testutil/retry" - "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type LifecycleShutdownConfig struct { @@ -33,7 +34,6 @@ const ( // Test the endpoints controller cleans up force-killed pods. func TestConnectInject_ProxyLifecycleShutdown(t *testing.T) { - t.Skipf("skiping this test, will be re-added in a future commit") cfg := suite.Config() cfg.SkipWhenOpenshiftAndCNI(t) @@ -139,7 +139,8 @@ func TestConnectInject_ProxyLifecycleShutdown(t *testing.T) { require.Len(t, pods.Items, 1) clientPodName := pods.Items[0].Name - var terminationGracePeriod int64 = 60 + // We should terminate the pods shortly after envoy gracefully shuts down in our 15s test cases. + var terminationGracePeriod int64 = 16 logger.Logf(t, "killing the %q pod with %dseconds termination grace period", clientPodName, terminationGracePeriod) err = ctx.KubernetesClient(t).CoreV1().Pods(ns).Delete(context.Background(), clientPodName, metav1.DeleteOptions{GracePeriodSeconds: &terminationGracePeriod}) require.NoError(t, err) @@ -154,23 +155,29 @@ func TestConnectInject_ProxyLifecycleShutdown(t *testing.T) { } if gracePeriodSeconds > 0 { - // Ensure outbound requests are still successful during grace - // period. - retry.RunWith(&retry.Timer{Timeout: time.Duration(gracePeriodSeconds) * time.Second, Wait: 2 * time.Second}, t, func(r *retry.R) { - output, err := k8s.RunKubectlAndGetOutputE(t, ctx.KubectlOptions(t), args...) - require.NoError(r, err) - require.Condition(r, func() bool { - exists := false - if strings.Contains(output, "curl: (7) Failed to connect") { - exists = true + // Ensure outbound requests are still successful during grace period. + gracePeriodTimer := time.NewTimer(time.Duration(gracePeriodSeconds) * time.Second) + gracePeriodLoop: + for { + select { + case <-gracePeriodTimer.C: + break gracePeriodLoop + default: + output, err := k8s.RunKubectlAndGetOutputE(t, ctx.KubectlOptions(t), args...) + require.NoError(t, err) + require.True(t, !strings.Contains(output, "curl: (7) Failed to connect")) + + // If listener draining is enabled, ensure inbound + // requests are rejected during grace period. + if !drainListenersEnabled { + connHelper.TestConnectionSuccess(t) } - return !exists - }) - }) + // TODO: check that the connection is unsuccessful when drainListenersEnabled is true + // dans note: I found it isn't sufficient to use the existing TestConnectionFailureWithoutIntention - // If listener draining is enabled, ensure inbound - // requests are rejected during grace period. - // connHelper.TestConnectionSuccess(t) + time.Sleep(2 * time.Second) + } + } } else { // Ensure outbound requests fail because proxy has terminated retry.RunWith(&retry.Timer{Timeout: time.Duration(terminationGracePeriod) * time.Second, Wait: 2 * time.Second}, t, func(r *retry.R) { @@ -187,7 +194,10 @@ func TestConnectInject_ProxyLifecycleShutdown(t *testing.T) { } logger.Log(t, "ensuring pod is deregistered after termination") - retry.Run(t, func(r *retry.R) { + // We wait an arbitrarily long time here. With the deployment rollout creating additional endpoints reconciles, + // This can cause the re-queued reconcile used to come back and clean up the service registration to be re-re-queued at + // 2-3X the intended grace period. + retry.RunWith(&retry.Timer{Timeout: time.Duration(60) * time.Second, Wait: 2 * time.Second}, t, func(r *retry.R) { for _, name := range []string{ "static-client", "static-client-sidecar-proxy",