Skip to content

Commit

Permalink
Support Consul ENT Namespaces in the endpoints controller (#475)
Browse files Browse the repository at this point in the history
* Add Consul Enterprise Namespace support to endpoints controller

- In order to support Consul namespaces, the controller now accepts
  values namespacesEnabled, consulDestinationNamespace, mirroringEnabled
and mirroringPrefix which determine which Consul namespace a service and
its proxy should be registered when created.

This behavior is fairly straightforward when registering an endpoint but
is a little trickier when de-registration is concerned. During
de-registration, as we use a consul agent, we need to iterate through
the list of all the namespaces in Consul that we register services
against and create a Client that targets that namespace and agent to
find services registered against the agent in a given namespace.

- Additional changes here require creating the ACL auth-method in the
  default namespace if namespace mirroring is configured and but in the
destination namespace otherwise. This also requires us to explicitly
specify the destination namespace of the service in order to query the
agent for the service being available accurately.

- Update the log format of the endpoints controller to be less verbose
  • Loading branch information
Ashwin Venkatesh authored and ishustava committed Apr 14, 2021
1 parent 6c1f85f commit 04eb04d
Show file tree
Hide file tree
Showing 11 changed files with 1,739 additions and 98 deletions.
7 changes: 5 additions & 2 deletions connect-inject/container_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,15 @@ consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
{{- if .NamespaceMirroringEnabled }}
{{- /* If namespace mirroring is enabled, the auth method is
defined in the default namespace */}}
-namespace="default"
-auth-method-namespace="default" \
{{- else }}
-namespace="{{ .ConsulNamespace }}"
-auth-method-namespace="{{ .ConsulNamespace }}" \
{{- end }}
{{- end }}
{{- end }}
{{- if .ConsulNamespace }}
-consul-service-namespace="{{ .ConsulNamespace }}" \
{{- end }}
# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand Down
8 changes: 6 additions & 2 deletions connect-inject/container_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func TestHandlerContainerInit_namespacesEnabled(t *testing.T) {
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-consul-service-namespace="default" \
# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand All @@ -192,6 +193,7 @@ consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-consul-service-namespace="non-default" \
# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand All @@ -218,7 +220,8 @@ export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-acl-auth-method="auth-method" \
-namespace="non-default"
-auth-method-namespace="non-default" \
-consul-service-namespace="non-default" \
# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand Down Expand Up @@ -247,7 +250,8 @@ export CONSUL_HTTP_ADDR="${HOST_IP}:8500"
export CONSUL_GRPC_ADDR="${HOST_IP}:8502"
consul-k8s connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \
-acl-auth-method="auth-method" \
-namespace="default"
-auth-method-namespace="default" \
-consul-service-namespace="k8snamespace" \
# Generate the envoy bootstrap code
/consul/connect-inject/consul connect envoy \
Expand Down
105 changes: 69 additions & 36 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/deckarep/golang-set"
"github.com/go-logr/logr"
"github.com/hashicorp/consul-k8s/consul"
"github.com/hashicorp/consul-k8s/namespaces"
"github.com/hashicorp/consul/api"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -48,6 +49,25 @@ type EndpointsController struct {
AllowK8sNamespacesSet mapset.Set
// Endpoints in the DenyK8sNamespacesSet are ignored.
DenyK8sNamespacesSet mapset.Set
// EnableConsulNamespaces indicates that a user is running Consul Enterprise
// with version 1.7+ which supports namespaces.
EnableConsulNamespaces bool
// ConsulDestinationNamespace is the name of the Consul namespace to create
// all config entries in. If EnableNSMirroring is true this is ignored.
ConsulDestinationNamespace string
// EnableNSMirroring causes Consul namespaces to be created to match the
// k8s namespace of any config entry custom resource. Config entries will
// be created in the matching Consul namespace.
EnableNSMirroring bool
// NSMirroringPrefix is an optional prefix that can be added to the Consul
// namespaces created while mirroring. For example, if it is set to "k8s-",
// then the k8s `default` namespace will be mirrored in Consul's
// `k8s-default` namespace.
NSMirroringPrefix string
// CrossNSACLPolicy is the name of the ACL policy to attach to
// any created Consul namespaces to allow cross namespace service discovery.
// Only necessary if ACLs are enabled.
CrossNSACLPolicy string
// ReleaseName is the Consul Helm installation release.
ReleaseName string
// ReleaseNamespace is the namespace where Consul is installed.
Expand Down Expand Up @@ -77,11 +97,11 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
}
return ctrl.Result{}, nil
} else if err != nil {
r.Log.Error(err, "failed to get Endpoints from Kubernetes", "name", req.Name, "namespace", req.Namespace)
r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace)
return ctrl.Result{}, err
}

r.Log.Info("retrieved Kubernetes Endpoints", "endpoints", serviceEndpoints.Name, "endpoints-namespace", serviceEndpoints.Namespace)
r.Log.Info("retrieved", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)

// endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare
// against service instances in Consul to deregister them if they are not in the map.
Expand All @@ -100,13 +120,13 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
var pod corev1.Pod
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
if err = r.Client.Get(ctx, objectKey, &pod); err != nil {
r.Log.Error(err, "failed to get pod from Kubernetes", "pod-name", address.TargetRef.Name)
r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name)
return ctrl.Result{}, err
}

if hasBeenInjected(pod) {
// Create client for Consul agent local to the pod.
client, err := r.remoteConsulClient(pod.Status.HostIP)
client, err := r.remoteConsulClient(pod.Status.HostIP, r.consulNamespace(pod.Namespace))
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.HostIP)
return ctrl.Result{}, err
Expand All @@ -115,38 +135,39 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
// Get information from the pod to create service instance registrations.
serviceRegistration, proxyServiceRegistration, err := r.createServiceRegistrations(pod, serviceEndpoints)
if err != nil {
r.Log.Error(err, "failed to create service registrations", "endpoints", serviceEndpoints.Name)
r.Log.Error(err, "failed to create service registrations for endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, err
}

// Register the service instance with the local agent.
// Note: the order of how we register services is important,
// and the connect-proxy service should come after the "main" service
// because its alias health check depends on the main service existing.
r.Log.Info("registering service", "service", serviceRegistration.Name)
r.Log.Info("registering service with Consul", "name", serviceRegistration.Name)
err = client.Agent().ServiceRegister(serviceRegistration)
if err != nil {
r.Log.Error(err, "failed to register service with Consul", "consul-service-name", serviceRegistration.Name)
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Name)
return ctrl.Result{}, err
}

// Register the proxy service instance with the local agent.
r.Log.Info("registering proxy service", "service", proxyServiceRegistration.Name)
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Name)
err = client.Agent().ServiceRegister(proxyServiceRegistration)
if err != nil {
r.Log.Error(err, "failed to register proxy service with Consul", "consul-proxy-service-name", proxyServiceRegistration.Name)
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Name)
return ctrl.Result{}, err
}

// Update the TTL health check for the service.
// This is required because ServiceRegister() does not update the TTL if the service already exists.
r.Log.Info("updating ttl health check", "service", serviceRegistration.Name)
r.Log.Info("updating TTL health check for service", "name", serviceRegistration.Name)
status, reason, err := getReadyStatusAndReason(pod)
if err != nil {
return ctrl.Result{}, err
}
err = client.Agent().UpdateTTL(getConsulHealthCheckID(pod, serviceRegistration.ID), reason, status)
if err != nil {
r.Log.Error(err, "failed to update TTL health check", "name", serviceRegistration.Name)
return ctrl.Result{}, err
}
}
Expand All @@ -158,7 +179,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (
// from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during
// the registration codepath.
if err = r.deregisterServiceOnAllAgents(ctx, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil {
r.Log.Error(err, "failed to deregister service instances on all agents", "k8s-service-name", serviceEndpoints.Name, "k8s-namespace", serviceEndpoints.Namespace)
r.Log.Error(err, "failed to deregister endpoints on all agents", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -240,7 +261,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
Port: servicePort,
Address: pod.Status.PodIP,
Meta: meta,
Namespace: "", // TODO: namespace support
Namespace: r.consulNamespace(pod.Namespace),
Check: &api.AgentServiceCheck{
CheckID: getConsulHealthCheckID(pod, serviceID),
Name: "Kubernetes Health Check",
Expand Down Expand Up @@ -299,7 +320,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
Address: pod.Status.PodIP,
TaggedAddresses: nil, // TODO: set cluster IP here (will be done later)
Meta: meta,
Namespace: "", // TODO: same as service namespace
Namespace: r.consulNamespace(pod.Namespace),
Proxy: proxyConfig,
Checks: api.AgentServiceChecks{
{
Expand Down Expand Up @@ -359,9 +380,8 @@ func getReadyStatusAndReason(pod corev1.Pod) (string, string, error) {
// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map
// has addresses, it will only deregister instances not in the map.
func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {

// Get all agents by getting pods with label component=client, app=consul and release=<ReleaseName>
list := corev1.PodList{}
agents := corev1.PodList{}
listOptions := client.ListOptions{
Namespace: r.ReleaseNamespace,
LabelSelector: labels.SelectorFromSet(map[string]string{
Expand All @@ -370,24 +390,23 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,
"release": r.ReleaseName,
}),
}
if err := r.Client.List(ctx, &list, &listOptions); err != nil {
r.Log.Error(err, "failed to get agent pods from Kubernetes")
if err := r.Client.List(ctx, &agents, &listOptions); err != nil {
r.Log.Error(err, "failed to get Consul client agent pods")
return err
}

// On each agent, we need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
for _, pod := range list.Items {
// Create client for this agent.
client, err := r.remoteConsulClient(pod.Status.PodIP)
for _, agent := range agents.Items {
client, err := r.remoteConsulClient(agent.Status.PodIP, r.consulNamespace(k8sSvcNamespace))
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.PodIP)
r.Log.Error(err, "failed to create a new Consul client", "address", agent.Status.PodIP)
return err
}

// Get services matching metadata.
svcs, err := serviceInstancesForK8SServiceNameAndNamespace(k8sSvcName, k8sSvcNamespace, client)
if err != nil {
r.Log.Error(err, "failed to get service instances", MetaKeyKubeServiceName, k8sSvcName)
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
return err
}

Expand All @@ -399,13 +418,13 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,
if _, ok := endpointsAddressesMap[serviceRegistration.Address]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
if err = client.Agent().ServiceDeregister(svcID); err != nil {
r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID)
r.Log.Error(err, "failed to deregister service instance", "id", svcID)
return err
}
}
} else {
if err = client.Agent().ServiceDeregister(svcID); err != nil {
r.Log.Error(err, "failed to deregister service instance", "consul-service-id", svcID)
r.Log.Error(err, "failed to deregister service instance", "id", svcID)
return err
}
}
Expand All @@ -430,17 +449,25 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
for _, raw := range strings.Split(raw, ",") {
parts := strings.SplitN(raw, ":", 3)

var datacenter, serviceName, preparedQuery string
var datacenter, serviceName, preparedQuery, namespace string
var port int32
if strings.TrimSpace(parts[0]) == "prepared_query" {
port, _ = portValue(pod, strings.TrimSpace(parts[2]))
preparedQuery = strings.TrimSpace(parts[1])
} else {
port, _ = portValue(pod, strings.TrimSpace(parts[1]))

// TODO: Parse the namespace if provided

serviceName = strings.TrimSpace(parts[0])
// If Consul Namespaces are enabled, attempt to parse the
// upstream for a namespace.
if r.EnableConsulNamespaces {
pieces := strings.SplitN(parts[0], ".", 2)
serviceName = strings.TrimSpace(pieces[0])
if len(pieces) > 1 {
namespace = strings.TrimSpace(pieces[1])
}
} else {
serviceName = strings.TrimSpace(parts[0])
}

// parse the optional datacenter
if len(parts) > 2 {
Expand Down Expand Up @@ -469,7 +496,7 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
if port > 0 {
upstream := api.Upstream{
DestinationType: api.UpstreamDestTypeService,
DestinationNamespace: "", // todo
DestinationNamespace: namespace,
DestinationName: serviceName,
Datacenter: datacenter,
LocalBindPort: int(port),
Expand All @@ -488,12 +515,12 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
return upstreams, nil
}

// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (r *EndpointsController) remoteConsulClient(ip string) (*api.Client, error) {
// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod for a provided namespace.
func (r *EndpointsController) remoteConsulClient(ip string, namespace string) (*api.Client, error) {
newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort)
localConfig := r.ConsulClientCfg
localConfig.Address = newAddr

localConfig.Namespace = namespace
return consul.NewClient(localConfig)
}

Expand Down Expand Up @@ -553,28 +580,28 @@ func (r EndpointsController) filterAgentPods(object client.Object) bool {
// for client agent pods where the Ready condition is true.
func (r EndpointsController) requestsForRunningAgentPods(object client.Object) []ctrl.Request {
var consulClientPod corev1.Pod
r.Log.Info("received update for consulClientPod", "podName", object.GetName())
r.Log.Info("received update for Consul client pod", "name", object.GetName())
err := r.Client.Get(r.Context, types.NamespacedName{Name: object.GetName(), Namespace: object.GetNamespace()}, &consulClientPod)
if k8serrors.IsNotFound(err) {
// Ignore if consulClientPod is not found.
return []ctrl.Request{}
}
if err != nil {
r.Log.Error(err, "failed to get consulClientPod", "consulClientPod", consulClientPod.Name)
r.Log.Error(err, "failed to get Consul client pod", "name", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not running, since
// we can't reconcile and register/deregister services against that agent.
if consulClientPod.Status.Phase != corev1.PodRunning {
r.Log.Info("ignoring consulClientPod because it's not running", "consulClientPod", consulClientPod.Name)
r.Log.Info("ignoring Consul client pod because it's not running", "name", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not yet ready, since
// we can't reconcile and register/deregister services against that agent.
for _, cond := range consulClientPod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status != corev1.ConditionTrue {
// Ignore if consulClientPod is not ready.
r.Log.Info("ignoring consulClientPod because it's not ready", "consulClientPod", consulClientPod.Name)
r.Log.Info("ignoring Consul client pod because it's not ready", "name", consulClientPod.Name)
return []ctrl.Request{}
}
}
Expand Down Expand Up @@ -605,6 +632,12 @@ func (r EndpointsController) requestsForRunningAgentPods(object client.Object) [
return requests
}

// consulNamespace returns the Consul destination namespace for a provided Kubernetes namespace
// depending on Consul Namespaces being enabled and the value of namespace mirroring.
func (r *EndpointsController) consulNamespace(namespace string) string {
return namespaces.ConsulNamespace(namespace, r.EnableConsulNamespaces, r.ConsulDestinationNamespace, r.EnableNSMirroring, r.NSMirroringPrefix)
}

// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[keyInjectStatus]; ok {
Expand Down
Loading

0 comments on commit 04eb04d

Please sign in to comment.