diff --git a/connect-inject/container_init.go b/connect-inject/container_init.go index 91f4970493..6e225ca90d 100644 --- a/connect-inject/container_init.go +++ b/connect-inject/container_init.go @@ -178,12 +178,15 @@ consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ {{- if .NamespaceMirroringEnabled }} {{- /* If namespace mirroring is enabled, the auth method is defined in the default namespace */}} - -namespace="default" + -auth-method-namespace="default" \ {{- else }} - -namespace="{{ .ConsulNamespace }}" + -auth-method-namespace="{{ .ConsulNamespace }}" \ {{- end }} {{- end }} {{- end }} + {{- if .ConsulNamespace }} + -consul-service-namespace="{{ .ConsulNamespace }}" \ + {{- end }} # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ diff --git a/connect-inject/container_init_test.go b/connect-inject/container_init_test.go index 844168c021..b869ed0d6d 100644 --- a/connect-inject/container_init_test.go +++ b/connect-inject/container_init_test.go @@ -168,6 +168,7 @@ func TestHandlerContainerInit_namespacesEnabled(t *testing.T) { export CONSUL_HTTP_ADDR="${HOST_IP}:8500" export CONSUL_GRPC_ADDR="${HOST_IP}:8502" consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ + -consul-service-namespace="default" \ # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ @@ -192,6 +193,7 @@ consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ export CONSUL_HTTP_ADDR="${HOST_IP}:8500" export CONSUL_GRPC_ADDR="${HOST_IP}:8502" consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ + -consul-service-namespace="non-default" \ # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ @@ -218,7 +220,8 @@ export CONSUL_HTTP_ADDR="${HOST_IP}:8500" export CONSUL_GRPC_ADDR="${HOST_IP}:8502" consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ -acl-auth-method="auth-method" \ - -namespace="non-default" + -auth-method-namespace="non-default" \ + -consul-service-namespace="non-default" \ # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ @@ -247,7 +250,8 @@ export CONSUL_HTTP_ADDR="${HOST_IP}:8500" export CONSUL_GRPC_ADDR="${HOST_IP}:8502" consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ -acl-auth-method="auth-method" \ - -namespace="default" + -auth-method-namespace="default" \ + -consul-service-namespace="k8snamespace" \ # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 59b20b09e7..793ec25a80 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -8,6 +8,7 @@ import ( "github.com/deckarep/golang-set" "github.com/go-logr/logr" "github.com/hashicorp/consul-k8s/consul" + "github.com/hashicorp/consul-k8s/namespaces" "github.com/hashicorp/consul/api" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -48,6 +49,25 @@ type EndpointsController struct { AllowK8sNamespacesSet mapset.Set // Endpoints in the DenyK8sNamespacesSet are ignored. DenyK8sNamespacesSet mapset.Set + // EnableConsulNamespaces indicates that a user is running Consul Enterprise + // with version 1.7+ which supports namespaces. + EnableConsulNamespaces bool + // ConsulDestinationNamespace is the name of the Consul namespace to create + // all config entries in. If EnableNSMirroring is true this is ignored. + ConsulDestinationNamespace string + // EnableNSMirroring causes Consul namespaces to be created to match the + // k8s namespace of any config entry custom resource. Config entries will + // be created in the matching Consul namespace. + EnableNSMirroring bool + // NSMirroringPrefix is an optional prefix that can be added to the Consul + // namespaces created while mirroring. For example, if it is set to "k8s-", + // then the k8s `default` namespace will be mirrored in Consul's + // `k8s-default` namespace. + NSMirroringPrefix string + // CrossNSACLPolicy is the name of the ACL policy to attach to + // any created Consul namespaces to allow cross namespace service discovery. + // Only necessary if ACLs are enabled. + CrossNSACLPolicy string // ReleaseName is the Consul Helm installation release. ReleaseName string // ReleaseNamespace is the namespace where Consul is installed. @@ -77,11 +97,11 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( } return ctrl.Result{}, nil } else if err != nil { - r.Log.Error(err, "failed to get Endpoints from Kubernetes", "name", req.Name, "namespace", req.Namespace) + r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace) return ctrl.Result{}, err } - r.Log.Info("retrieved Kubernetes Endpoints", "endpoints", serviceEndpoints.Name, "endpoints-namespace", serviceEndpoints.Namespace) + r.Log.Info("retrieved", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) // endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare // against service instances in Consul to deregister them if they are not in the map. @@ -100,13 +120,13 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( var pod corev1.Pod objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace} if err = r.Client.Get(ctx, objectKey, &pod); err != nil { - r.Log.Error(err, "failed to get pod from Kubernetes", "pod-name", address.TargetRef.Name) + r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name) return ctrl.Result{}, err } if hasBeenInjected(pod) { // Create client for Consul agent local to the pod. - client, err := r.remoteConsulClient(pod.Status.HostIP) + client, err := r.remoteConsulClient(pod.Status.HostIP, r.consulNamespace(pod.Namespace)) if err != nil { r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.HostIP) return ctrl.Result{}, err @@ -115,7 +135,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( // Get information from the pod to create service instance registrations. serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints) if err != nil { - r.Log.Error(err, "failed to create service registrations", "endpoints", serviceEndpoints.Name) + r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) return ctrl.Result{}, err } @@ -123,30 +143,31 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( // Note: the order of how we register services is important, // and the connect-proxy service should come after the "main" service // because its alias health check depends on the main service existing. - r.Log.Info("registering service", "service", serviceRegistration.Name) + r.Log.Info("registering service with Consul", "name", serviceRegistration.Name) err = client.Agent().ServiceRegister(serviceRegistration) if err != nil { - r.Log.Error(err, "failed to register service with Consul", "consul-service-name", serviceRegistration.Name) + r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name) return ctrl.Result{}, err } // Register the proxy service instance with the local agent. - r.Log.Info("registering proxy service", "service", proxyServiceRegistration.Name) + r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name) err = client.Agent().ServiceRegister(proxyServiceRegistration) if err != nil { - r.Log.Error(err, "failed to register proxy service with Consul", "consul-proxy-service-name", proxyServiceRegistration.Name) + r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name) return ctrl.Result{}, err } // Update the TTL health check for the service. // This is required because ServiceRegister() does not update the TTL if the service already exists. - r.Log.Info("updating ttl health check", "service", serviceRegistration.Name) + r.Log.Info("updating TTL health check for service", "name", serviceRegistration.Name) status, reason, err := getReadyStatusAndReason(pod) if err != nil { return ctrl.Result{}, err } err = client.Agent().UpdateTTL(getConsulHealthCheckID(pod, serviceRegistration.ID), reason, status) if err != nil { + r.Log.Error(err, "failed to update TTL health check", "name", serviceRegistration.Name) return ctrl.Result{}, err } } @@ -158,7 +179,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) ( // from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during // the registration codepath. if err = r.deregisterServiceOnAllAgents(ctx, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil { - r.Log.Error(err, "failed to deregister service instances on all agents", "k8s-service-name", serviceEndpoints.Name, "k8s-namespace", serviceEndpoints.Namespace) + r.Log.Error(err, "failed to deregister endpoints on all agents", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace) return ctrl.Result{}, err } @@ -240,7 +261,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service Port: servicePort, Address: pod.Status.PodIP, Meta: meta, - Namespace: "", // TODO: namespace support + Namespace: r.consulNamespace(pod.Namespace), Check: &api.AgentServiceCheck{ CheckID: getConsulHealthCheckID(pod, serviceID), Name: "Kubernetes Health Check", @@ -299,7 +320,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service Address: pod.Status.PodIP, TaggedAddresses: nil, // TODO: set cluster IP here (will be done later) Meta: meta, - Namespace: "", // TODO: same as service namespace + Namespace: r.consulNamespace(pod.Namespace), Proxy: proxyConfig, Checks: api.AgentServiceChecks{ { @@ -359,9 +380,8 @@ func getReadyStatusAndReason(pod corev1.Pod) (string, string, error) { // them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map // has addresses, it will only deregister instances not in the map. func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error { - // Get all agents by getting pods with label component=client, app=consul and release= - list := corev1.PodList{} + agents := corev1.PodList{} listOptions := client.ListOptions{ Namespace: r.ReleaseNamespace, LabelSelector: labels.SelectorFromSet(map[string]string{ @@ -370,24 +390,23 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, "release": r.ReleaseName, }), } - if err := r.Client.List(ctx, &list, &listOptions); err != nil { - r.Log.Error(err, "failed to get agent pods from Kubernetes") + if err := r.Client.List(ctx, &agents, &listOptions); err != nil { + r.Log.Error(err, "failed to get Consul client agent pods") return err } // On each agent, we need to get services matching "k8s-service-name" and "k8s-namespace" metadata. - for _, pod := range list.Items { - // Create client for this agent. - client, err := r.remoteConsulClient(pod.Status.PodIP) + for _, agent := range agents.Items { + client, err := r.remoteConsulClient(agent.Status.PodIP, r.consulNamespace(k8sSvcNamespace)) if err != nil { - r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.PodIP) + r.Log.Error(err, "failed to create a new Consul client", "address", agent.Status.PodIP) return err } // Get services matching metadata. svcs, err := serviceInstancesForK8SServiceNameAndNamespace(k8sSvcName, k8sSvcNamespace, client) if err != nil { - r.Log.Error(err, "failed to get service instances", MetaKeyKubeServiceName, k8sSvcName) + r.Log.Error(err, "failed to get service instances", "name", k8sSvcName) return err } @@ -399,13 +418,13 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, if _, ok := endpointsAddressesMap[serviceRegistration.Address]; !ok { // If the service address is not in the Endpoints addresses, deregister it. if err = client.Agent().ServiceDeregister(svcID); err != nil { - r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID) + r.Log.Error(err, "failed to deregister service instance", "id", svcID) return err } } } else { if err = client.Agent().ServiceDeregister(svcID); err != nil { - r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID) + r.Log.Error(err, "failed to deregister service instance", "id", svcID) return err } } @@ -430,7 +449,7 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, for _, raw := range strings.Split(raw, ",") { parts := strings.SplitN(raw, ":", 3) - var datacenter, serviceName, preparedQuery string + var datacenter, serviceName, preparedQuery, namespace string var port int32 if strings.TrimSpace(parts[0]) == "prepared_query" { port, _ = portValue(pod, strings.TrimSpace(parts[2])) @@ -438,9 +457,17 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, } else { port, _ = portValue(pod, strings.TrimSpace(parts[1])) - // TODO: Parse the namespace if provided - - serviceName = strings.TrimSpace(parts[0]) + // If Consul Namespaces are enabled, attempt to parse the + // upstream for a namespace. + if r.EnableConsulNamespaces { + pieces := strings.SplitN(parts[0], ".", 2) + serviceName = strings.TrimSpace(pieces[0]) + if len(pieces) > 1 { + namespace = strings.TrimSpace(pieces[1]) + } + } else { + serviceName = strings.TrimSpace(parts[0]) + } // parse the optional datacenter if len(parts) > 2 { @@ -469,7 +496,7 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, if port > 0 { upstream := api.Upstream{ DestinationType: api.UpstreamDestTypeService, - DestinationNamespace: "", // todo + DestinationNamespace: namespace, DestinationName: serviceName, Datacenter: datacenter, LocalBindPort: int(port), @@ -488,12 +515,12 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, return upstreams, nil } -// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod. -func (r *EndpointsController) remoteConsulClient(ip string) (*api.Client, error) { +// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod for a provided namespace. +func (r *EndpointsController) remoteConsulClient(ip string, namespace string) (*api.Client, error) { newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort) localConfig := r.ConsulClientCfg localConfig.Address = newAddr - + localConfig.Namespace = namespace return consul.NewClient(localConfig) } @@ -553,20 +580,20 @@ func (r EndpointsController) filterAgentPods(object client.Object) bool { // for client agent pods where the Ready condition is true. func (r EndpointsController) requestsForRunningAgentPods(object client.Object) []ctrl.Request { var consulClientPod corev1.Pod - r.Log.Info("received update for consulClientPod", "podName", object.GetName()) + r.Log.Info("received update for Consul client pod", "name", object.GetName()) err := r.Client.Get(r.Context, types.NamespacedName{Name: object.GetName(), Namespace: object.GetNamespace()}, &consulClientPod) if k8serrors.IsNotFound(err) { // Ignore if consulClientPod is not found. return []ctrl.Request{} } if err != nil { - r.Log.Error(err, "failed to get consulClientPod", "consulClientPod", consulClientPod.Name) + r.Log.Error(err, "failed to get Consul client pod", "name", consulClientPod.Name) return []ctrl.Request{} } // We can ignore the agent pod if it's not running, since // we can't reconcile and register/deregister services against that agent. if consulClientPod.Status.Phase != corev1.PodRunning { - r.Log.Info("ignoring consulClientPod because it's not running", "consulClientPod", consulClientPod.Name) + r.Log.Info("ignoring Consul client pod because it's not running", "name", consulClientPod.Name) return []ctrl.Request{} } // We can ignore the agent pod if it's not yet ready, since @@ -574,7 +601,7 @@ func (r EndpointsController) requestsForRunningAgentPods(object client.Object) [ for _, cond := range consulClientPod.Status.Conditions { if cond.Type == corev1.PodReady && cond.Status != corev1.ConditionTrue { // Ignore if consulClientPod is not ready. - r.Log.Info("ignoring consulClientPod because it's not ready", "consulClientPod", consulClientPod.Name) + r.Log.Info("ignoring Consul client pod because it's not ready", "name", consulClientPod.Name) return []ctrl.Request{} } } @@ -605,6 +632,12 @@ func (r EndpointsController) requestsForRunningAgentPods(object client.Object) [ return requests } +// consulNamespace returns the Consul destination namespace for a provided Kubernetes namespace +// depending on Consul Namespaces being enabled and the value of namespace mirroring. +func (r *EndpointsController) consulNamespace(namespace string) string { + return namespaces.ConsulNamespace(namespace, r.EnableConsulNamespaces, r.ConsulDestinationNamespace, r.EnableNSMirroring, r.NSMirroringPrefix) +} + // hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected. func hasBeenInjected(pod corev1.Pod) bool { if anno, ok := pod.Annotations[keyInjectStatus]; ok { diff --git a/connect-inject/endpoints_controller_ent_test.go b/connect-inject/endpoints_controller_ent_test.go new file mode 100644 index 0000000000..a997d12ead --- /dev/null +++ b/connect-inject/endpoints_controller_ent_test.go @@ -0,0 +1,1230 @@ +// +build enterprise + +package connectinject + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/deckarep/golang-set" + logrtest "github.com/go-logr/logr/testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/hashicorp/consul-k8s/namespaces" + "github.com/hashicorp/consul-k8s/subcommand/common" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +// TestReconcileCreateEndpoint tests the logic to create service instances in Consul from the addresses in the Endpoints +// object. The cases test a basic endpoints object with two addresses. This test verifies that the services and their TTL +// health checks are created in the expected Consul namespace for various combinations of namespace flags. +// This test covers EndpointsController.createServiceRegistrations. +func TestReconcileCreateEndpointWithNamespaces(t *testing.T) { + t.Parallel() + nodeName := "test-node" + cases := map[string]struct { + Mirror bool + MirrorPrefix string + SourceKubeNS string + DestConsulNS string + ExpConsulNS string + }{ + "SourceKubeNS=default, DestConsulNS=default": { + SourceKubeNS: "default", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, DestConsulNS=default": { + SourceKubeNS: "kube", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=default, DestConsulNS=other": { + SourceKubeNS: "default", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=kube, DestConsulNS=other": { + SourceKubeNS: "kube", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=default, Mirror=true": { + SourceKubeNS: "default", + Mirror: true, + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, Mirror=true": { + SourceKubeNS: "kube", + Mirror: true, + ExpConsulNS: "kube", + }, + "SourceKubeNS=default, Mirror=true, Prefix=prefix": { + SourceKubeNS: "default", + Mirror: true, + MirrorPrefix: "prefix-", + ExpConsulNS: "prefix-default", + }, + } + for name, test := range cases { + setup := struct { + consulSvcName string + k8sObjects func() []runtime.Object + initialConsulSvcs []*api.AgentServiceRegistration + expectedNumSvcInstances int + expectedConsulSvcInstances []*api.CatalogService + expectedProxySvcInstances []*api.CatalogService + expectedAgentHealthChecks []*api.AgentCheck + }{ + consulSvcName: "service-created", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", test.SourceKubeNS, "1.2.3.4", true) + pod2 := createPodWithNamespace("pod2", test.SourceKubeNS, "2.2.3.4", true) + endpointWithTwoAddresses := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-created", + Namespace: test.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: test.SourceKubeNS, + }, + }, + { + IP: "2.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod2", + Namespace: test.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, pod2, endpointWithTwoAddresses} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{}, + expectedNumSvcInstances: 2, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-created", + ServiceName: "service-created", + ServiceAddress: "1.2.3.4", + ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceTags: []string{}, + Namespace: test.ExpConsulNS, + }, + { + ServiceID: "pod2-service-created", + ServiceName: "service-created", + ServiceAddress: "2.2.3.4", + ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceTags: []string{}, + Namespace: test.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-created-sidecar-proxy", + ServiceName: "service-created-sidecar-proxy", + ServiceAddress: "1.2.3.4", + ServicePort: 20000, + ServiceProxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-created", + DestinationServiceID: "pod1-service-created", + }, + ServiceMeta: map[string]string{MetaKeyPodName: "pod1", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceTags: []string{}, + Namespace: test.ExpConsulNS, + }, + { + ServiceID: "pod2-service-created-sidecar-proxy", + ServiceName: "service-created-sidecar-proxy", + ServiceAddress: "2.2.3.4", + ServicePort: 20000, + ServiceProxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-created", + DestinationServiceID: "pod2-service-created", + }, + ServiceMeta: map[string]string{MetaKeyPodName: "pod2", MetaKeyKubeServiceName: "service-created", MetaKeyKubeNS: test.SourceKubeNS}, + ServiceTags: []string{}, + Namespace: test.ExpConsulNS, + }, + }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: fmt.Sprintf("%s/pod1-service-created/kubernetes-health-check", test.SourceKubeNS), + ServiceName: "service-created", + ServiceID: "pod1-service-created", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + Namespace: test.ExpConsulNS, + }, + { + CheckID: fmt.Sprintf("%s/pod2-service-created/kubernetes-health-check", test.SourceKubeNS), + ServiceName: "service-created", + ServiceID: "pod2-service-created", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + Namespace: test.ExpConsulNS, + }, + }, + } + t.Run(name, func(t *testing.T) { + // The agent pod needs to have the address 127.0.0.1 so when the + // code gets the agent pods via the label component=client, and + // makes requests against the agent API, it will actually hit the + // test server we have on localhost. + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} + + // Create fake k8s client. + k8sObjects := append(setup.k8sObjects(), fakeClientPod) + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + // Create test Consul server. + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + consul.WaitForLeader(t) + + cfg := &api.Config{ + Address: consul.HTTPAddr, + Namespace: test.ExpConsulNS, + } + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + addr := strings.Split(consul.HTTPAddr, ":") + consulPort := addr[1] + + _, err = namespaces.EnsureExists(consulClient, test.ExpConsulNS, "") + require.NoError(t, err) + + // Register service and proxy in Consul. + for _, svc := range setup.initialConsulSvcs { + err = consulClient.Agent().ServiceRegister(svc) + require.NoError(t, err) + } + + // Create the endpoints controller. + ep := &EndpointsController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: "http", + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ReleaseName: "consul", + ReleaseNamespace: "default", + ConsulClientCfg: cfg, + EnableConsulNamespaces: true, + ConsulDestinationNamespace: test.DestConsulNS, + EnableNSMirroring: test.Mirror, + NSMirroringPrefix: test.MirrorPrefix, + } + namespacedName := types.NamespacedName{ + Namespace: test.SourceKubeNS, + Name: "service-created", + } + + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + require.NoError(t, err) + require.False(t, resp.Requeue) + + // After reconciliation, Consul should have the service with the correct number of instances. + serviceInstances, _, err := consulClient.Catalog().Service(setup.consulSvcName, "", &api.QueryOptions{Namespace: test.ExpConsulNS}) + require.NoError(t, err) + require.Len(t, serviceInstances, setup.expectedNumSvcInstances) + for i, instance := range serviceInstances { + require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceName, instance.ServiceName) + require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress) + require.Equal(t, setup.expectedConsulSvcInstances[i].ServicePort, instance.ServicePort) + require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceMeta, instance.ServiceMeta) + require.Equal(t, setup.expectedConsulSvcInstances[i].ServiceTags, instance.ServiceTags) + } + proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", setup.consulSvcName), "", &api.QueryOptions{ + Namespace: test.ExpConsulNS, + }) + require.NoError(t, err) + require.Len(t, proxyServiceInstances, setup.expectedNumSvcInstances) + for i, instance := range proxyServiceInstances { + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceName, instance.ServiceName) + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress) + require.Equal(t, setup.expectedProxySvcInstances[i].ServicePort, instance.ServicePort) + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceProxy, instance.ServiceProxy) + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceMeta, instance.ServiceMeta) + require.Equal(t, setup.expectedProxySvcInstances[i].ServiceTags, instance.ServiceTags) + } + + _, checkInfos, err := consulClient.Agent().AgentHealthServiceByName(fmt.Sprintf("%s-sidecar-proxy", setup.consulSvcName)) + expectedChecks := []string{"Proxy Public Listener", "Destination Alias"} + require.NoError(t, err) + require.Len(t, checkInfos, setup.expectedNumSvcInstances) + for _, checkInfo := range checkInfos { + checks := checkInfo.Checks + require.Contains(t, expectedChecks, checks[0].Name) + require.Contains(t, expectedChecks, checks[1].Name) + } + + // Check that the Consul health check was created for the k8s pod. + if setup.expectedAgentHealthChecks != nil { + for i := range setup.expectedConsulSvcInstances { + filter := fmt.Sprintf("CheckID == `%s`", setup.expectedAgentHealthChecks[i].CheckID) + newChecks, _ := consulClient.Agent().Checks() + for key, value := range newChecks { + fmt.Printf("%s:%v\n", key, value) + } + check, err := consulClient.Agent().ChecksWithFilter(filter) + require.NoError(t, err) + require.EqualValues(t, 1, len(check)) + // Ignoring Namespace because the response from ENT includes it and OSS does not. + var ignoredFields = []string{"Node", "Definition", "Namespace"} + require.True(t, cmp.Equal(check[setup.expectedAgentHealthChecks[i].CheckID], setup.expectedAgentHealthChecks[i], cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) + } + } + }) + } +} + +// Tests updating an Endpoints object when Consul namespaces are enabled. +// - Tests updates via the register codepath: +// - When an address in an Endpoint is updated, that the corresponding service instance in Consul is updated in the correct Consul namespace. +// - When an address is added to an Endpoint, an additional service instance in Consul is registered in the correct Consul namespace. +// - Tests updates via the deregister codepath: +// - When an address is removed from an Endpoint, the corresponding service instance in Consul is deregistered. +// - When an address is removed from an Endpoint *and there are no addresses left in the Endpoint*, the +// corresponding service instance in Consul is deregistered. +// For the register and deregister codepath, this also tests that they work when the Consul service name is different +// from the K8s service name. +// This test covers EndpointsController.deregisterServiceOnAllAgents when services should be selectively deregistered +// since the map will not be nil. This test also runs each test with ACLs+TLS enabled and disabled, since it covers all the cases where a Consul client is created. +func TestReconcileUpdateEndpointWithNamespaces(t *testing.T) { + t.Parallel() + nodeName := "test-node" + cases := map[string]struct { + Mirror bool + MirrorPrefix string + SourceKubeNS string + DestConsulNS string + ExpConsulNS string + }{ + "SourceKubeNS=default, DestConsulNS=default": { + SourceKubeNS: "default", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, DestConsulNS=default": { + SourceKubeNS: "kube", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=default, DestConsulNS=other": { + SourceKubeNS: "default", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=kube, DestConsulNS=other": { + SourceKubeNS: "kube", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=default, Mirror=true": { + SourceKubeNS: "default", + Mirror: true, + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, Mirror=true": { + SourceKubeNS: "kube", + Mirror: true, + ExpConsulNS: "kube", + }, + "SourceKubeNS=default, Mirror=true, Prefix=prefix": { + SourceKubeNS: "default", + Mirror: true, + MirrorPrefix: "prefix-", + ExpConsulNS: "prefix-default", + }, + } + for name, ts := range cases { + cases := []struct { + name string + consulSvcName string + k8sObjects func() []runtime.Object + initialConsulSvcs []*api.AgentServiceRegistration + expectedNumSvcInstances int + expectedConsulSvcInstances []*api.CatalogService + expectedProxySvcInstances []*api.CatalogService + }{ + { + name: "Endpoints has an updated address (pod IP change).", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "4.4.4.4", true) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "4.4.4.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "4.4.4.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "4.4.4.4", + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + name: "Different Consul service name: Endpoints has an updated address (pod IP change).", + consulSvcName: "different-consul-svc-name", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "4.4.4.4", true) + pod1.Annotations[annotationService] = "different-consul-svc-name" + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "4.4.4.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod1-different-consul-svc-name", + }, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-different-consul-svc-name", + ServiceAddress: "4.4.4.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-different-consul-svc-name-sidecar-proxy", + ServiceAddress: "4.4.4.4", + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + name: "Endpoints has additional address not in Consul.", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true) + pod2 := createPodWithNamespace("pod2", ts.SourceKubeNS, "2.2.3.4", true) + endpointWithTwoAddresses := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + { + IP: "2.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod2", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, pod2, endpointWithTwoAddresses} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 2, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + ServiceID: "pod2-service-updated", + ServiceAddress: "2.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + { + ServiceID: "pod2-service-updated-sidecar-proxy", + ServiceAddress: "2.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + name: "Consul has instances that are not in the Endpoints addresses.", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true) + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + ID: "pod2-service-updated", + Name: "service-updated", + Port: 80, + Address: "2.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod2-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "2.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod2-service-updated", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-service-updated-sidecar-proxy", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + name: "Different Consul service name: Consul has instances that are not in the Endpoints addresses.", + consulSvcName: "different-consul-svc-name", + k8sObjects: func() []runtime.Object { + pod1 := createPodWithNamespace("pod1", ts.SourceKubeNS, "1.2.3.4", true) + pod1.Annotations[annotationService] = "different-consul-svc-name" + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: ts.SourceKubeNS, + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod1-different-consul-svc-name", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + ID: "pod2-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "2.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod2-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "2.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod2-different-consul-svc-name", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 1, + expectedConsulSvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-different-consul-svc-name", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + expectedProxySvcInstances: []*api.CatalogService{ + { + ServiceID: "pod1-different-consul-svc-name-sidecar-proxy", + ServiceAddress: "1.2.3.4", + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + // When a k8s deployment is deleted but it's k8s service continues to exist, the endpoints has no addresses + // and the instances should be deleted from Consul. + name: "Consul has instances that are not in the endpoints, and the endpoints has no addresses.", + consulSvcName: "service-updated", + k8sObjects: func() []runtime.Object { + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + } + return []runtime.Object{endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-updated", + Name: "service-updated", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod1-service-updated", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + ID: "pod2-service-updated", + Name: "service-updated", + Port: 80, + Address: "2.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod2-service-updated-sidecar-proxy", + Name: "service-updated-sidecar-proxy", + Port: 20000, + Address: "2.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-updated", + DestinationServiceID: "pod2-service-updated", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 0, + expectedConsulSvcInstances: []*api.CatalogService{}, + expectedProxySvcInstances: []*api.CatalogService{}, + }, + { + // With a different Consul service name, when a k8s deployment is deleted but it's k8s service continues to + // exist, the endpoints has no addresses and the instances should be deleted from Consul. + name: "Different Consul service name: Consul has instances that are not in the endpoints, and the endpoints has no addresses.", + consulSvcName: "different-consul-svc-name", + k8sObjects: func() []runtime.Object { + endpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-updated", + Namespace: ts.SourceKubeNS, + }, + } + return []runtime.Object{endpoint} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod1-different-consul-svc-name", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + ID: "pod2-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "2.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod2-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "2.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod2-different-consul-svc-name", + }, + Meta: map[string]string{"k8s-service-name": "service-updated", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + expectedNumSvcInstances: 0, + expectedConsulSvcInstances: []*api.CatalogService{}, + expectedProxySvcInstances: []*api.CatalogService{}, + }, + } + for _, secure := range []bool{true, false} { + for _, tt := range cases { + t.Run(fmt.Sprintf("%s: %s - secure: %v", name, tt.name, secure), func(t *testing.T) { + // The agent pod needs to have the address 127.0.0.1 so when the + // code gets the agent pods via the label component=client, and + // makes requests against the agent API, it will actually hit the + // test server we have on localhost. + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} + + // Create fake k8s client. + k8sObjects := append(tt.k8sObjects(), fakeClientPod) + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586" + caFile, certFile, keyFile := common.GenerateServerCerts(t) + // Create test consul server, with ACLs+TLS if necessary. + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + if secure { + c.ACL.Enabled = true + c.ACL.DefaultPolicy = "deny" + c.ACL.Tokens.Master = masterToken + c.CAFile = caFile + c.CertFile = certFile + c.KeyFile = keyFile + } + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + consul.WaitForSerfCheck(t) + + cfg := &api.Config{ + Scheme: "http", + Address: consul.HTTPAddr, + Namespace: ts.ExpConsulNS, + } + if secure { + cfg.Address = consul.HTTPSAddr + cfg.Scheme = "https" + cfg.TLSConfig = api.TLSConfig{ + CAFile: caFile, + } + cfg.Token = masterToken + } + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + addr := strings.Split(cfg.Address, ":") + consulPort := addr[1] + + _, err = namespaces.EnsureExists(consulClient, ts.ExpConsulNS, "") + require.NoError(t, err) + + // Register service and proxy in Consul. + for _, svc := range tt.initialConsulSvcs { + err = consulClient.Agent().ServiceRegister(svc) + require.NoError(t, err) + } + + // Create the endpoints controller. + ep := &EndpointsController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: cfg.Scheme, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ReleaseName: "consul", + ReleaseNamespace: "default", + ConsulClientCfg: cfg, + EnableConsulNamespaces: true, + EnableNSMirroring: ts.Mirror, + NSMirroringPrefix: ts.MirrorPrefix, + ConsulDestinationNamespace: ts.DestConsulNS, + } + namespacedName := types.NamespacedName{ + Namespace: ts.SourceKubeNS, + Name: "service-updated", + } + + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + require.NoError(t, err) + require.False(t, resp.Requeue) + + // After reconciliation, Consul should have service-updated with the correct number of instances. + serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", &api.QueryOptions{Namespace: ts.ExpConsulNS}) + require.NoError(t, err) + require.Len(t, serviceInstances, tt.expectedNumSvcInstances) + for i, instance := range serviceInstances { + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress) + } + proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", &api.QueryOptions{Namespace: ts.ExpConsulNS}) + require.NoError(t, err) + require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances) + for i, instance := range proxyServiceInstances { + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID) + require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress) + } + }) + } + } + } +} + +// Tests deleting an Endpoints object, with and without matching Consul and K8s service names when Consul namespaces are enabled. +// This test covers EndpointsController.deregisterServiceOnAllAgents when the map is nil (not selectively deregistered). +func TestReconcileDeleteEndpointWithNamespaces(t *testing.T) { + t.Parallel() + nodeName := "test-node" + cases := map[string]struct { + Mirror bool + MirrorPrefix string + SourceKubeNS string + DestConsulNS string + ExpConsulNS string + }{ + "SourceKubeNS=default, DestConsulNS=default": { + SourceKubeNS: "default", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, DestConsulNS=default": { + SourceKubeNS: "kube", + DestConsulNS: "default", + ExpConsulNS: "default", + }, + "SourceKubeNS=default, DestConsulNS=other": { + SourceKubeNS: "default", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=kube, DestConsulNS=other": { + SourceKubeNS: "kube", + DestConsulNS: "other", + ExpConsulNS: "other", + }, + "SourceKubeNS=default, Mirror=true": { + SourceKubeNS: "default", + Mirror: true, + ExpConsulNS: "default", + }, + "SourceKubeNS=kube, Mirror=true": { + SourceKubeNS: "kube", + Mirror: true, + ExpConsulNS: "kube", + }, + "SourceKubeNS=default, Mirror=true, Prefix=prefix": { + SourceKubeNS: "default", + Mirror: true, + MirrorPrefix: "prefix-", + ExpConsulNS: "prefix-default", + }, + } + for name, ts := range cases { + cases := []struct { + name string + consulSvcName string + initialConsulSvcs []*api.AgentServiceRegistration + }{ + { + name: "Consul service name matches K8s service name", + consulSvcName: "service-deleted", + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-service-deleted", + Name: "service-deleted", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-service-deleted-sidecar-proxy", + Name: "service-deleted-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "service-deleted", + DestinationServiceID: "pod1-service-deleted", + }, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + }, + { + name: "Consul service name does not match K8s service name", + consulSvcName: "different-consul-svc-name", + initialConsulSvcs: []*api.AgentServiceRegistration{ + { + ID: "pod1-different-consul-svc-name", + Name: "different-consul-svc-name", + Port: 80, + Address: "1.2.3.4", + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + { + Kind: api.ServiceKindConnectProxy, + ID: "pod1-different-consul-svc-name-sidecar-proxy", + Name: "different-consul-svc-name-sidecar-proxy", + Port: 20000, + Address: "1.2.3.4", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "different-consul-svc-name", + DestinationServiceID: "pod1-different-consul-svc-name", + }, + Meta: map[string]string{"k8s-service-name": "service-deleted", "k8s-namespace": ts.SourceKubeNS}, + Namespace: ts.ExpConsulNS, + }, + }, + }, + } + for _, tt := range cases { + t.Run(fmt.Sprintf("%s:%s", name, tt.name), func(t *testing.T) { + // The agent pod needs to have the address 127.0.0.1 so when the + // code gets the agent pods via the label component=client, and + // makes requests against the agent API, it will actually hit the + // test server we have on localhost. + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false) + fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} + + // Create fake k8s client. + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(fakeClientPod).Build() + + // Create test Consul server. + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + + consul.WaitForLeader(t) + cfg := &api.Config{ + Address: consul.HTTPAddr, + Namespace: ts.ExpConsulNS, + } + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + addr := strings.Split(consul.HTTPAddr, ":") + consulPort := addr[1] + + _, err = namespaces.EnsureExists(consulClient, ts.ExpConsulNS, "") + require.NoError(t, err) + + // Register service and proxy in consul. + for _, svc := range tt.initialConsulSvcs { + err = consulClient.Agent().ServiceRegister(svc) + require.NoError(t, err) + } + + // Create the endpoints controller. + ep := &EndpointsController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: "http", + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ReleaseName: "consul", + ReleaseNamespace: "default", + ConsulClientCfg: cfg, + EnableConsulNamespaces: true, + EnableNSMirroring: ts.Mirror, + NSMirroringPrefix: ts.MirrorPrefix, + ConsulDestinationNamespace: ts.DestConsulNS, + } + + // Set up the Endpoint that will be reconciled, and reconcile. + namespacedName := types.NamespacedName{ + Namespace: ts.SourceKubeNS, + Name: "service-deleted", + } + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + require.NoError(t, err) + require.False(t, resp.Requeue) + + // After reconciliation, Consul should not have any instances of service-deleted. + serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", &api.QueryOptions{Namespace: ts.ExpConsulNS}) + require.NoError(t, err) + require.Empty(t, serviceInstances) + proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", &api.QueryOptions{Namespace: ts.ExpConsulNS}) + require.NoError(t, err) + require.Empty(t, proxyServiceInstances) + + }) + } + } +} + +func createPodWithNamespace(name, namespace, ip string, inject bool) *corev1.Pod { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: corev1.PodStatus{ + PodIP: ip, + HostIP: "127.0.0.1", + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + if inject { + pod.Labels[keyInjectStatus] = injected + pod.Annotations[keyInjectStatus] = injected + } + return pod + +} diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index f3bf5cf7c2..edea1b20ac 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -186,12 +186,13 @@ func TestProcessUpstreams(t *testing.T) { t.Parallel() nodeName := "test-node" cases := []struct { - name string - pod func() *corev1.Pod - expected []api.Upstream - expErr string - configEntry func() api.ConfigEntry - consulUnavailable bool + name string + pod func() *corev1.Pod + expected []api.Upstream + expErr string + configEntry func() api.ConfigEntry + consulUnavailable bool + consulNamespacesEnabled bool }{ { name: "upstream with datacenter without ProxyDefaults", @@ -200,7 +201,8 @@ func TestProcessUpstreams(t *testing.T) { pod1.Annotations[annotationUpstreams] = "upstream1:1234:dc1" return pod1 }, - expErr: "upstream \"upstream1:1234:dc1\" is invalid: there is no ProxyDefaults config to set mesh gateway mode", + expErr: "upstream \"upstream1:1234:dc1\" is invalid: there is no ProxyDefaults config to set mesh gateway mode", + consulNamespacesEnabled: false, }, { name: "upstream with datacenter with ProxyDefaults whose mesh gateway mode is not local or remote", @@ -216,6 +218,7 @@ func TestProcessUpstreams(t *testing.T) { pd.MeshGateway.Mode = "bad-mode" return pd }, + consulNamespacesEnabled: false, }, { name: "upstream with datacenter with ProxyDefaults and mesh gateway is in local mode", @@ -238,6 +241,7 @@ func TestProcessUpstreams(t *testing.T) { pd.MeshGateway.Mode = api.MeshGatewayModeLocal return pd }, + consulNamespacesEnabled: false, }, { name: "upstream with datacenter with ProxyDefaults and mesh gateway in remote mode", @@ -260,10 +264,10 @@ func TestProcessUpstreams(t *testing.T) { pd.MeshGateway.Mode = api.MeshGatewayModeRemote return pd }, + consulNamespacesEnabled: false, }, { - name: "when consul is unavailable, we don't return an error", - consulUnavailable: true, + name: "when consul is unavailable, we don't return an error", pod: func() *corev1.Pod { pod1 := createPod("pod1", "1.2.3.4", true) pod1.Annotations[annotationUpstreams] = "upstream1:1234:dc1" @@ -284,6 +288,8 @@ func TestProcessUpstreams(t *testing.T) { Datacenter: "dc1", }, }, + consulUnavailable: true, + consulNamespacesEnabled: false, }, { name: "single upstream", @@ -299,6 +305,24 @@ func TestProcessUpstreams(t *testing.T) { LocalBindPort: 1234, }, }, + consulNamespacesEnabled: false, + }, + { + name: "single upstream with namespace", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true) + pod1.Annotations[annotationUpstreams] = "upstream.foo:1234" + return pod1 + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream", + LocalBindPort: 1234, + DestinationNamespace: "foo", + }, + }, + consulNamespacesEnabled: true, }, { name: "multiple upstreams", @@ -319,6 +343,41 @@ func TestProcessUpstreams(t *testing.T) { LocalBindPort: 2234, }, }, + consulNamespacesEnabled: false, + }, + { + name: "multiple upstreams with consul namespaces and datacenters", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true) + pod1.Annotations[annotationUpstreams] = "upstream1:1234, upstream2.bar:2234, upstream3.foo:3234:dc2" + return pod1 + }, + configEntry: func() api.ConfigEntry { + ce, _ := api.MakeConfigEntry(api.ProxyDefaults, "pd") + pd := ce.(*api.ProxyConfigEntry) + pd.MeshGateway.Mode = "remote" + return pd + }, + expected: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + LocalBindPort: 1234, + }, + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream2", + DestinationNamespace: "bar", + LocalBindPort: 2234, + }, { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream3", + DestinationNamespace: "foo", + LocalBindPort: 3234, + Datacenter: "dc2", + }, + }, + consulNamespacesEnabled: true, }, { name: "prepared query upstream", @@ -334,6 +393,7 @@ func TestProcessUpstreams(t *testing.T) { LocalBindPort: 1234, }, }, + consulNamespacesEnabled: false, }, { name: "prepared query and non-query upstreams", @@ -359,6 +419,7 @@ func TestProcessUpstreams(t *testing.T) { LocalBindPort: 8202, }, }, + consulNamespacesEnabled: false, }, } for _, tt := range cases { @@ -387,12 +448,13 @@ func TestProcessUpstreams(t *testing.T) { } ep := &EndpointsController{ - Log: logrtest.TestLogger{T: t}, - ConsulClient: consulClient, - ConsulPort: consulPort, - ConsulScheme: "http", - AllowK8sNamespacesSet: mapset.NewSetWith("*"), - DenyK8sNamespacesSet: mapset.NewSetWith(), + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: "http", + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + EnableConsulNamespaces: tt.consulNamespacesEnabled, } upstreams, err := ep.processUpstreams(*tt.pod()) diff --git a/go.mod b/go.mod index 0dd38a6377..d55fbf83f3 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,6 @@ require ( github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/stretchr/testify v1.6.1 go.uber.org/zap v1.15.0 - golang.org/x/net v0.0.0-20201110031124-69a78807bb2b golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 // indirect golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect golang.org/x/tools v0.0.0-20200616195046-dc31b401abb5 // indirect diff --git a/subcommand/common/common.go b/subcommand/common/common.go index 24f3f81257..70c3000a5b 100644 --- a/subcommand/common/common.go +++ b/subcommand/common/common.go @@ -52,7 +52,7 @@ func ValidateUnprivilegedPort(flagName, flagValue string) error { // ConsulLogin issues an ACL().Login to Consul and writes out the token to tokenSinkFile. // The logic of this is taken from the `consul login` command. -func ConsulLogin(client *api.Client, bearerTokenFile, authMethodName, tokenSinkFile string, meta map[string]string) error { +func ConsulLogin(client *api.Client, bearerTokenFile, authMethodName, tokenSinkFile, namespace string, meta map[string]string) error { if meta == nil { return fmt.Errorf("invalid meta") } @@ -70,7 +70,7 @@ func ConsulLogin(client *api.Client, bearerTokenFile, authMethodName, tokenSinkF BearerToken: bearerToken, Meta: meta, } - tok, _, err := client.ACL().Login(req, nil) + tok, _, err := client.ACL().Login(req, &api.WriteOptions{Namespace: namespace}) if err != nil { return fmt.Errorf("error logging in: %s", err) } diff --git a/subcommand/common/common_test.go b/subcommand/common/common_test.go index 05820b9723..f8cccd8db1 100644 --- a/subcommand/common/common_test.go +++ b/subcommand/common/common_test.go @@ -46,13 +46,7 @@ func TestConsulLogin(t *testing.T) { tokenFile := WriteTempFile(t, "") client := startMockServer(t, &counter) - err := ConsulLogin( - client, - bearerTokenFile, - testAuthMethod, - tokenFile, - testPodMeta, - ) + err := ConsulLogin(client, bearerTokenFile, testAuthMethod, tokenFile, "", testPodMeta) require.NoError(err) require.Equal(counter, 1) // Validate that the token file was written to disk. @@ -71,6 +65,7 @@ func TestConsulLogin_EmptyBearerTokenFile(t *testing.T) { bearerTokenFile, testAuthMethod, "", + "", testPodMeta, ) require.EqualError(err, fmt.Sprintf("no bearer token found in %s", bearerTokenFile)) @@ -85,6 +80,7 @@ func TestConsulLogin_BearerTokenFileDoesNotExist(t *testing.T) { randFileName, testAuthMethod, "", + "", testPodMeta, ) require.Error(err) @@ -103,6 +99,7 @@ func TestConsulLogin_TokenFileUnwritable(t *testing.T) { bearerTokenFile, testAuthMethod, randFileName, + "", testPodMeta, ) require.Error(err) diff --git a/subcommand/connect-init/command.go b/subcommand/connect-init/command.go index 276315ae06..f2da924f28 100644 --- a/subcommand/connect-init/command.go +++ b/subcommand/connect-init/command.go @@ -30,9 +30,11 @@ const ( type Command struct { UI cli.Ui - flagACLAuthMethod string // Auth Method to use for ACLs, if enabled. - flagPodName string // Pod name. - flagPodNamespace string // Pod namespace. + flagACLAuthMethod string // Auth Method to use for ACLs, if enabled. + flagPodName string // Pod name. + flagPodNamespace string // Pod namespace. + flagAuthMethodNamespace string // Consul namespace the auth-method is defined in. + flagConsulServiceNamespace string // Consul destination namespace for the service. bearerTokenFile string // Location of the bearer token. Default is /var/run/secrets/kubernetes.io/serviceaccount/token. tokenSinkFile string // Location to write the output token. Default is defaultTokenSinkFile. @@ -51,6 +53,8 @@ func (c *Command) init() { c.flagSet.StringVar(&c.flagACLAuthMethod, "acl-auth-method", "", "Name of the auth method to login to.") c.flagSet.StringVar(&c.flagPodName, "pod-name", "", "Name of the pod.") c.flagSet.StringVar(&c.flagPodNamespace, "pod-namespace", "", "Name of the pod namespace.") + c.flagSet.StringVar(&c.flagAuthMethodNamespace, "auth-method-namespace", "", "Consul namespace the auth-method is defined in") + c.flagSet.StringVar(&c.flagConsulServiceNamespace, "consul-service-namespace", "", "Consul destination namespace of the service.") if c.bearerTokenFile == "" { c.bearerTokenFile = defaultBearerTokenFile @@ -86,6 +90,7 @@ func (c *Command) Run(args []string) int { } cfg := api.DefaultConfig() + cfg.Namespace = c.flagConsulServiceNamespace c.http.MergeOntoConfig(cfg) consulClient, err := consul.NewClient(cfg) if err != nil { @@ -98,7 +103,7 @@ func (c *Command) Run(args []string) int { // loginMeta is the default metadata that we pass to the consul login API. loginMeta := map[string]string{"pod": fmt.Sprintf("%s/%s", c.flagPodNamespace, c.flagPodName)} err = backoff.Retry(func() error { - err := common.ConsulLogin(consulClient, c.bearerTokenFile, c.flagACLAuthMethod, c.tokenSinkFile, loginMeta) + err := common.ConsulLogin(consulClient, c.bearerTokenFile, c.flagACLAuthMethod, c.tokenSinkFile, c.flagAuthMethodNamespace, loginMeta) if err != nil { c.UI.Error(fmt.Sprintf("Consul login failed; retrying: %s", err)) } @@ -122,12 +127,7 @@ func (c *Command) Run(args []string) int { // which maps to this pod+namespace. var proxyID string err = backoff.Retry(func() error { - filter := fmt.Sprintf( - "Meta[%q] == %q and Meta[%q] == %q", - connectinject.MetaKeyPodName, - c.flagPodName, - connectinject.MetaKeyKubeNS, - c.flagPodNamespace) + filter := fmt.Sprintf("Meta[%q] == %q and Meta[%q] == %q", connectinject.MetaKeyPodName, c.flagPodName, connectinject.MetaKeyKubeNS, c.flagPodNamespace) serviceList, err := consulClient.Agent().ServicesWithFilter(filter) if err != nil { c.UI.Error(fmt.Sprintf("Unable to get Agent services: %s", err)) diff --git a/subcommand/connect-init/command_ent_test.go b/subcommand/connect-init/command_ent_test.go new file mode 100644 index 0000000000..394b473284 --- /dev/null +++ b/subcommand/connect-init/command_ent_test.go @@ -0,0 +1,307 @@ +// +build enterprise + +package connectinit + +import ( + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/hashicorp/consul-k8s/namespaces" + "github.com/hashicorp/consul-k8s/subcommand/common" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" +) + +func TestRun_ServicePollingWithACLsAndTLSWithNamespaces(t *testing.T) { + t.Parallel() + cases := []struct { + name string + tls bool + consulServiceNamespace string + authMethod string + authMethodNamespace string + }{ + { + name: "ACLs enabled, no tls, serviceNS=default, authMethodNS=default", + tls: false, + consulServiceNamespace: "default", + authMethodNamespace: "default", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs enabled, tls, serviceNS=default, authMethodNS=default", + tls: true, + consulServiceNamespace: "default", + authMethodNamespace: "default", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs enabled, no tls, serviceNS=default-ns, authMethodNS=default", + tls: false, + consulServiceNamespace: "default-ns", + authMethodNamespace: "default", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs enabled, tls, serviceNS=default-ns, authMethodNS=default", + tls: true, + consulServiceNamespace: "default-ns", + authMethodNamespace: "default", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs enabled, no tls, serviceNS=other, authMethodNS=other", + tls: false, + consulServiceNamespace: "other", + authMethodNamespace: "other", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs enabled, tls, serviceNS=other, authMethodNS=other", + tls: true, + consulServiceNamespace: "other", + authMethodNamespace: "other", + authMethod: "consul-k8s-auth-method", + }, + { + name: "ACLs disabled, no tls, serviceNS=default, authMethodNS=default", + tls: false, + consulServiceNamespace: "default", + authMethodNamespace: "default", + }, + { + name: "ACLs disabled, tls, serviceNS=default, authMethodNS=default", + tls: true, + consulServiceNamespace: "default", + authMethodNamespace: "default", + }, + { + name: "ACLs disabled, no tls, serviceNS=default-ns, authMethodNS=default", + tls: false, + consulServiceNamespace: "default-ns", + authMethodNamespace: "default", + }, + { + name: "ACLs disabled, tls, serviceNS=default-ns, authMethodNS=default", + tls: true, + consulServiceNamespace: "default-ns", + authMethodNamespace: "default", + }, + { + name: "ACLs disabled, no tls, serviceNS=other, authMethodNS=other", + tls: false, + consulServiceNamespace: "other", + authMethodNamespace: "other", + }, + { + name: "ACLs disabled, tls, serviceNS=other, authMethodNS=other", + tls: true, + consulServiceNamespace: "other", + authMethodNamespace: "other", + }, + } + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + bearerFile := common.WriteTempFile(t, serviceAccountJWTToken) + tokenFile := fmt.Sprintf("/tmp/%d1", rand.Int()) + proxyFile := fmt.Sprintf("/tmp/%d2", rand.Int()) + t.Cleanup(func() { + os.Remove(proxyFile) + os.Remove(tokenFile) + }) + + var caFile, certFile, keyFile string + // Start Consul server with ACLs enabled and default deny policy. + masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586" + server, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + if test.authMethod != "" { + c.ACL.Enabled = true + c.ACL.DefaultPolicy = "deny" + c.ACL.Tokens.Master = masterToken + } + if test.tls { + caFile, certFile, keyFile = common.GenerateServerCerts(t) + c.CAFile = caFile + c.CertFile = certFile + c.KeyFile = keyFile + } + }) + require.NoError(t, err) + defer server.Stop() + server.WaitForLeader(t) + cfg := &api.Config{ + Scheme: "http", + Address: server.HTTPAddr, + Namespace: test.consulServiceNamespace, + } + if test.authMethod != "" { + cfg.Token = masterToken + } + if test.tls { + cfg.Address = server.HTTPSAddr + cfg.Scheme = "https" + cfg.TLSConfig = api.TLSConfig{ + CAFile: caFile, + } + } + + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + + _, err = namespaces.EnsureExists(consulClient, test.consulServiceNamespace, "") + require.NoError(t, err) + + // Start the mock k8s server. + k8sMockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + if r != nil && r.URL.Path == "/apis/authentication.k8s.io/v1/tokenreviews" && r.Method == "POST" { + w.Write([]byte(tokenReviewFoundResponseForNamespaces)) + } + if r != nil && r.URL.Path == "/api/v1/namespaces/default-ns/serviceaccounts/counting" && r.Method == "GET" { + w.Write([]byte(readServiceAccountFoundForNamespaces)) + } + })) + defer k8sMockServer.Close() + + if test.authMethod != "" { + // Set up Consul's auth method. + authMethod := &api.ACLAuthMethod{ + Name: testAuthMethod, + Type: "kubernetes", + Description: "Kubernetes Auth Method", + Config: map[string]interface{}{ + "Host": k8sMockServer.URL, + "CACert": serviceAccountCACert, + "ServiceAccountJWT": serviceAccountJWTToken, + }, + Namespace: test.authMethodNamespace, + } + // This will be the case when we are emulating "namespace mirroring" where the + // authMethodNamespace is not equal to the consulServiceNamespace. + if test.authMethodNamespace != test.consulServiceNamespace { + authMethod.Config["MapNamespaces"] = true + } + _, _, err = consulClient.ACL().AuthMethodCreate(authMethod, &api.WriteOptions{Namespace: test.authMethodNamespace}) + require.NoError(t, err) + + // Create the binding rule. + aclBindingRule := api.ACLBindingRule{ + Description: "Kubernetes binding rule", + AuthMethod: testAuthMethod, + BindType: api.BindingRuleBindTypeService, + BindName: "${serviceaccount.name}", + Selector: "serviceaccount.name!=default", + Namespace: test.authMethodNamespace, + } + _, _, err = consulClient.ACL().BindingRuleCreate(&aclBindingRule, &api.WriteOptions{Namespace: test.authMethodNamespace}) + require.NoError(t, err) + } + + // Register Consul services. + testConsulServices := []api.AgentServiceRegistration{consulCountingSvc, consulCountingSvcSidecar} + for _, svc := range testConsulServices { + require.NoError(t, consulClient.Agent().ServiceRegister(&svc)) + } + + ui := cli.NewMockUi() + cmd := Command{ + UI: ui, + bearerTokenFile: bearerFile, + tokenSinkFile: tokenFile, + proxyIDFile: proxyFile, + serviceRegistrationPollingAttempts: 5, + } + // We build the http-addr because normally it's defined by the init container setting + // CONSUL_HTTP_ADDR when it processes the command template. + flags := []string{"-pod-name", testPodName, + "-pod-namespace", testPodNamespace, + "-acl-auth-method", test.authMethod, + "-http-addr", fmt.Sprintf("%s://%s", cfg.Scheme, cfg.Address), + "-consul-service-namespace", test.consulServiceNamespace, + "-auth-method-namespace", test.authMethodNamespace, + } + // Add the CA File if necessary since we're not setting CONSUL_CACERT in test ENV. + if test.tls { + flags = append(flags, "-ca-file", caFile) + } + // Run the command. + code := cmd.Run(flags) + require.Equal(t, 0, code, ui.ErrorWriter.String()) + + if test.authMethod != "" { + // Validate the ACL token was written. + tokenData, err := ioutil.ReadFile(tokenFile) + require.NoError(t, err) + require.NotEmpty(t, tokenData) + + // Check that the token has the metadata with pod name and pod namespace. + consulClient, err = api.NewClient(&api.Config{Address: server.HTTPAddr, Token: string(tokenData), Namespace: test.consulServiceNamespace}) + require.NoError(t, err) + token, _, err := consulClient.ACL().TokenReadSelf(&api.QueryOptions{Namespace: test.authMethodNamespace}) + require.NoError(t, err) + require.Equal(t, "token created via login: {\"pod\":\"default-ns/counting-pod\"}", token.Description) + } + + // Validate contents of proxyFile. + data, err := ioutil.ReadFile(proxyFile) + require.NoError(t, err) + require.Contains(t, string(data), "counting-counting-sidecar-proxy") + }) + } +} + +// The namespace here is default-ns as the k8s-auth method +// relies on the namespace in the response from Kubernetes to +// correctly create the token in the same namespace as the Kubernetes +// namespace which is required when namespace mirroring is enabled. +// Note that this namespace is incorrect for other test cases but +// Consul only cares about this namespace when mirroring is enabled. +const ( + readServiceAccountFoundForNamespaces = `{ + "kind": "ServiceAccount", + "apiVersion": "v1", + "metadata": { + "name": "counting", + "namespace": "default-ns", + "selfLink": "/api/v1/namespaces/default-ns/serviceaccounts/counting", + "uid": "9ff51ff4-557e-11e9-9687-48e6c8b8ecb5", + "resourceVersion": "2101", + "creationTimestamp": "2019-04-02T19:36:34Z" + }, + "secrets": [ + { + "name": "counting-token-m9cvn" + } + ] +}` + + tokenReviewFoundResponseForNamespaces = `{ + "kind": "TokenReview", + "apiVersion": "authentication.k8s.io/v1", + "metadata": { + "creationTimestamp": null + }, + "spec": { + "token": "eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJkZWZhdWx0Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZWNyZXQubmFtZSI6ImRlbW8tdG9rZW4tbTljdm4iLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiZGVtbyIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6IjlmZjUxZmY0LTU1N2UtMTFlOS05Njg3LTQ4ZTZjOGI4ZWNiNSIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpkZWZhdWx0OmRlbW8ifQ.UJEphtrN261gy9WCl4ZKjm2PRDLDkc3Xg9VcDGfzyroOqFQ6sog5dVAb9voc5Nc0-H5b1yGwxDViEMucwKvZpA5pi7VEx_OskK-KTWXSmafM0Xg_AvzpU9Ed5TSRno-OhXaAraxdjXoC4myh1ay2DMeHUusJg_ibqcYJrWx-6MO1bH_ObORtAKhoST_8fzkqNAlZmsQ87FinQvYN5mzDXYukl-eeRdBgQUBkWvEb-Ju6cc0-QE4sUQ4IH_fs0fUyX_xc0om0SZGWLP909FTz4V8LxV8kr6L7irxROiS1jn3Fvyc9ur1PamVf3JOPPrOyfmKbaGRiWJM32b3buQw7cg" + }, + "status": { + "authenticated": true, + "user": { + "username": "system:serviceaccount:default-ns:counting", + "uid": "9ff51ff4-557e-11e9-9687-48e6c8b8ecb5", + "groups": [ + "system:serviceaccounts", + "system:serviceaccounts:default-ns", + "system:authenticated" + ] + } + } +}` +) diff --git a/subcommand/inject-connect/command.go b/subcommand/inject-connect/command.go index 4531c6cb4c..36e952c81b 100644 --- a/subcommand/inject-connect/command.go +++ b/subcommand/inject-connect/command.go @@ -49,14 +49,15 @@ type Command struct { flagEnvoyExtraArgs string // Extra envoy args when starting envoy flagLogLevel string - // Flags to support namespaces - flagEnableNamespaces bool // Use namespacing on all components - flagConsulDestinationNamespace string // Consul namespace to register everything if not mirroring - flagAllowK8sNamespacesList []string // K8s namespaces to explicitly inject - flagDenyK8sNamespacesList []string // K8s namespaces to deny injection (has precedence) - flagEnableK8SNSMirroring bool // Enables mirroring of k8s namespaces into Consul - flagK8SNSMirroringPrefix string // Prefix added to Consul namespaces created when mirroring - flagCrossNamespaceACLPolicy string // The name of the ACL policy to add to every created namespace if ACLs are enabled + flagAllowK8sNamespacesList []string // K8s namespaces to explicitly inject + flagDenyK8sNamespacesList []string // K8s namespaces to deny injection (has precedence) + + // Flags to support Consul namespaces + flagEnableNamespaces bool // Use namespacing on all components + flagConsulDestinationNamespace string // Consul namespace to register everything if not mirroring + flagEnableK8SNSMirroring bool // Enables mirroring of k8s namespaces into Consul + flagK8SNSMirroringPrefix string // Prefix added to Consul namespaces created when mirroring + flagCrossNamespaceACLPolicy string // The name of the ACL policy to add to every created namespace if ACLs are enabled // Flags for endpoints controller. flagReleaseName string @@ -383,19 +384,24 @@ func (c *Command) Run(args []string) int { } if err = (&connectinject.EndpointsController{ - Client: mgr.GetClient(), - ConsulClient: c.consulClient, - ConsulScheme: consulURL.Scheme, - ConsulPort: consulURL.Port(), - AllowK8sNamespacesSet: allowK8sNamespaces, - DenyK8sNamespacesSet: denyK8sNamespaces, - Log: ctrl.Log.WithName("controller").WithName("endpoints-controller"), - Scheme: mgr.GetScheme(), - ReleaseName: c.flagReleaseName, - ReleaseNamespace: c.flagReleaseNamespace, - MetricsConfig: metricsConfig, - Context: ctx, - ConsulClientCfg: cfg, + Client: mgr.GetClient(), + ConsulClient: c.consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + AllowK8sNamespacesSet: allowK8sNamespaces, + DenyK8sNamespacesSet: denyK8sNamespaces, + MetricsConfig: metricsConfig, + ConsulClientCfg: cfg, + EnableConsulNamespaces: c.flagEnableNamespaces, + ConsulDestinationNamespace: c.flagConsulDestinationNamespace, + EnableNSMirroring: c.flagEnableK8SNSMirroring, + NSMirroringPrefix: c.flagK8SNSMirroringPrefix, + CrossNSACLPolicy: c.flagCrossNamespaceACLPolicy, + Log: ctrl.Log.WithName("controller").WithName("endpoints"), + Scheme: mgr.GetScheme(), + ReleaseName: c.flagReleaseName, + ReleaseNamespace: c.flagReleaseNamespace, + Context: ctx, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{}) return 1 @@ -417,16 +423,16 @@ func (c *Command) Run(args []string) int { DefaultProxyCPULimit: sidecarProxyCPULimit, DefaultProxyMemoryRequest: sidecarProxyMemoryRequest, DefaultProxyMemoryLimit: sidecarProxyMemoryLimit, + MetricsConfig: metricsConfig, InitContainerResources: initResources, ConsulSidecarResources: consulSidecarResources, - EnableNamespaces: c.flagEnableNamespaces, AllowK8sNamespacesSet: allowK8sNamespaces, DenyK8sNamespacesSet: denyK8sNamespaces, + EnableNamespaces: c.flagEnableNamespaces, ConsulDestinationNamespace: c.flagConsulDestinationNamespace, EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy, - MetricsConfig: metricsConfig, Log: logger.Named("handler"), }})