Skip to content

Commit

Permalink
Add Consul Enterprise Namespace support to endpoints controller
Browse files Browse the repository at this point in the history
- In order to support Consul namespaces, the controller now accepts
  values namespacesEnabled, consulDestinationNamespace, mirroringEnabled
and mirroringPrefix which determine which Consul namespace a service and
it's proxy should be registered in when created.

This behavior is fairly straightforward when registering an endpoint but
is a little tricker when de-registration is concerned. During
de-registration, as we use a consul agent, we need to iterate through
the list of all the namespaces in Consul that we register services
against and create a Client that targets that namespace and agent to
find services registered against the agent in a given namespace.

- Additional changes here require the creating the ACL authmethod in the
  default namespace if namespace mirroring is configured and but in the
destination namespace otherwise. This also requires us to explicitly
specify the destination namespace of the service in order to query the
agent for the service being available accurately.
  • Loading branch information
Ashwin Venkatesh committed Apr 8, 2021
1 parent 2f144ec commit 7c3d03a
Show file tree
Hide file tree
Showing 9 changed files with 1,406 additions and 97 deletions.
7 changes: 5 additions & 2 deletions connect-inject/container_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,15 @@ consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
{{- if .NamespaceMirroringEnabled }}
{{- /* If namespace mirroring is enabled, the auth method is
defined in the default namespace */}}
-namespace="default"
-auth-method-namespace="default" \
{{- else }}
-namespace="{{ .ConsulNamespace }}"
-auth-method-namespace="{{ .ConsulNamespace }}" \
{{- end }}
{{- end }}
{{- end }}
{{- if .ConsulNamespace }}
-service-namespace="{{ .ConsulNamespace }}" \
{{- end }}
# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand Down
8 changes: 6 additions & 2 deletions connect-inject/container_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func TestHandlerContainerInit_namespacesEnabled(t *testing.T) {
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-service-namespace="default" \
# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand All @@ -192,6 +193,7 @@ consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-service-namespace="non-default" \
# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand All @@ -218,7 +220,8 @@ export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-acl-auth-method="auth-method" \
-namespace="non-default"
-auth-method-namespace="non-default" \
-service-namespace="non-default" \
# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand Down Expand Up @@ -247,7 +250,8 @@ export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-acl-auth-method="auth-method" \
-namespace="default"
-auth-method-namespace="default" \
-service-namespace="k8snamespace" \
# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand Down
132 changes: 96 additions & 36 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/deckarep/golang-set"
"github.com/go-logr/logr"
"github.com/hashicorp/consul-k8s/consul"
"github.com/hashicorp/consul-k8s/namespaces"
"github.com/hashicorp/consul/api"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -48,6 +49,25 @@ type EndpointsController struct {
AllowK8sNamespacesSet mapset.Set
// Endpoints in the DenyK8sNamespacesSet are ignored.
DenyK8sNamespacesSet mapset.Set
// EnableConsulNamespaces indicates that a user is running Consul Enterprise
// with version 1.7+ which supports namespaces.
EnableConsulNamespaces bool
// ConsulDestinationNamespace is the name of the Consul namespace to create
// all config entries in. If EnableNSMirroring is true this is ignored.
ConsulDestinationNamespace string
// EnableNSMirroring causes Consul namespaces to be created to match the
// k8s namespace of any config entry custom resource. Config entries will
// be created in the matching Consul namespace.
EnableNSMirroring bool
// NSMirroringPrefix is an optional prefix that can be added to the Consul
// namespaces created while mirroring. For example, if it is set to "k8s-",
// then the k8s `default` namespace will be mirrored in Consul's
// `k8s-default` namespace.
NSMirroringPrefix string
// CrossNSACLPolicy is the name of the ACL policy to attach to
// any created Consul namespaces to allow cross namespace service discovery.
// Only necessary if ACLs are enabled.
CrossNSACLPolicy string
// ReleaseName is the Consul Helm installation release.
ReleaseName string
// ReleaseNamespace is the namespace where Consul is installed.
Expand Down Expand Up @@ -81,7 +101,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

r.Log.Info("retrieved Kubernetes Endpoints", "endpoints", serviceEndpoints.Name, "endpoints-namespace", serviceEndpoints.Namespace)
r.Log.Info("retrieved", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)

// endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare
// against service instances in Consul to deregister them if they are not in the map.
Expand All @@ -106,7 +126,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (

if hasBeenInjected(pod) {
// Create client for Consul agent local to the pod.
client, err := r.remoteConsulClient(pod.Status.HostIP)
client, err := r.remoteConsulClient(pod.Status.HostIP, r.consulNamespace(pod.Namespace))
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.HostIP)
return ctrl.Result{}, err
Expand Down Expand Up @@ -240,7 +260,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
Port: servicePort,
Address: pod.Status.PodIP,
Meta: meta,
Namespace: "", // TODO: namespace support
Namespace: r.consulNamespace(pod.Namespace),
Check: &api.AgentServiceCheck{
CheckID: getConsulHealthCheckID(pod, serviceID),
Name: "Kubernetes Health Check",
Expand Down Expand Up @@ -299,7 +319,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
Address: pod.Status.PodIP,
TaggedAddresses: nil, // TODO: set cluster IP here (will be done later)
Meta: meta,
Namespace: "", // TODO: same as service namespace
Namespace: r.consulNamespace(pod.Namespace),
Proxy: proxyConfig,
Checks: api.AgentServiceChecks{
{
Expand Down Expand Up @@ -384,43 +404,69 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,

// On each agent, we need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
for _, pod := range list.Items {
// Create client for this agent.
client, err := r.remoteConsulClient(pod.Status.PodIP)
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.PodIP)
return err
}
// Iterate through each namespace from the list of namespaces.
for _, ns := range r.consulNamespaceList() {
// Create client for above agent and namespace combination.
client, err := r.remoteConsulClient(pod.Status.PodIP, ns)
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.PodIP)
return err
}

// Get services matching metadata.
svcs, err := serviceInstancesForK8SServiceNameAndNamespace(k8sSvcName, k8sSvcNamespace, client)
if err != nil {
r.Log.Error(err, "failed to get service instances", MetaKeyKubeServiceName, k8sSvcName)
return err
}
// Get services matching metadata.
svcs, err := serviceInstancesForK8SServiceNameAndNamespace(k8sSvcName, k8sSvcNamespace, client)
if err != nil {
r.Log.Error(err, "failed to get service instances", MetaKeyKubeServiceName, k8sSvcName)
return err
}

// Deregister each service instance that matches the metadata.
for svcID, serviceRegistration := range svcs {
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
if endpointsAddressesMap != nil {
if _, ok := endpointsAddressesMap[serviceRegistration.Address]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
// Deregister each service instance that matches the metadata.
for svcID, serviceRegistration := range svcs {
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
if endpointsAddressesMap != nil {
if _, ok := endpointsAddressesMap[serviceRegistration.Address]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
if err = client.Agent().ServiceDeregister(svcID); err != nil {
r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID)
return err
}
}
} else {
if err = client.Agent().ServiceDeregister(svcID); err != nil {
r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID)
return err
}
}
} else {
if err = client.Agent().ServiceDeregister(svcID); err != nil {
r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID)
return err
}
}
}
}
return nil
}

// consulNamespaceList returns a list of the Consul namespaces where services registered
// from the Kubernetes cluster exist in Consul. It returns "" when namespaces are enabled,
// the ConsulDestinationNamespace if mirroring is disabled and all the namespaces if mirroring
// is enabled.
func (r *EndpointsController) consulNamespaceList() []string {
if r.EnableConsulNamespaces {
if r.EnableNSMirroring {
var namespaces []string
ns, _, err := r.ConsulClient.Namespaces().List(nil)
if err != nil {
r.Log.Error(err, "failed to list Consul namespaces")
}
for _, n := range ns {
namespaces = append(namespaces, n.Name)
}
return namespaces
} else {
return []string{r.ConsulDestinationNamespace}
}
}
return []string{""}
}

// serviceInstancesForK8SServiceNameAndNamespace calls Consul's ServicesWithFilter to get the list
// of services instances that have the provided k8sServiceName and k8sServiceNamespace in their metadata.
func serviceInstancesForK8SServiceNameAndNamespace(k8sServiceName, k8sServiceNamespace string, client *api.Client) (map[string]*api.AgentService, error) {
Expand All @@ -437,17 +483,25 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
for _, raw := range strings.Split(raw, ",") {
parts := strings.SplitN(raw, ":", 3)

var datacenter, serviceName, preparedQuery string
var datacenter, serviceName, preparedQuery, namespace string
var port int32
if strings.TrimSpace(parts[0]) == "prepared_query" {
port, _ = portValue(pod, strings.TrimSpace(parts[2]))
preparedQuery = strings.TrimSpace(parts[1])
} else {
port, _ = portValue(pod, strings.TrimSpace(parts[1]))

// TODO: Parse the namespace if provided

serviceName = strings.TrimSpace(parts[0])
// If Consul Namespaces are enabled, attempt to parse the
// upstream for a namespace.
if r.EnableConsulNamespaces {
pieces := strings.SplitN(parts[0], ".", 2)
serviceName = strings.TrimSpace(pieces[0])
if len(pieces) > 1 {
namespace = strings.TrimSpace(pieces[1])
}
} else {
serviceName = strings.TrimSpace(parts[0])
}

// parse the optional datacenter
if len(parts) > 2 {
Expand Down Expand Up @@ -476,7 +530,7 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
if port > 0 {
upstream := api.Upstream{
DestinationType: api.UpstreamDestTypeService,
DestinationNamespace: "", // todo
DestinationNamespace: namespace,
DestinationName: serviceName,
Datacenter: datacenter,
LocalBindPort: int(port),
Expand All @@ -495,12 +549,12 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
return upstreams, nil
}

// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (r *EndpointsController) remoteConsulClient(ip string) (*api.Client, error) {
// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod for a provided namespace.
func (r *EndpointsController) remoteConsulClient(ip string, namespace string) (*api.Client, error) {
newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort)
localConfig := r.ConsulClientCfg
localConfig.Address = newAddr

localConfig.Namespace = namespace
return consul.NewClient(localConfig)
}

Expand Down Expand Up @@ -612,6 +666,12 @@ func (r EndpointsController) requestsForRunningAgentPods(object client.Object) [
return requests
}

// consulNamespace returns the Consul destination namespace for a provided Kubernetes namespace
// depending on Consul Namespaces being enabled and the value of namespace mirroring.
func (r *EndpointsController) consulNamespace(namespace string) string {
return namespaces.ConsulNamespace(namespace, r.EnableConsulNamespaces, r.ConsulDestinationNamespace, r.EnableNSMirroring, r.NSMirroringPrefix)
}

// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[annotationStatus]; ok {
Expand Down
Loading

0 comments on commit 7c3d03a

Please sign in to comment.