Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acls and tls to endpoints controller #470

Merged
merged 8 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type EndpointsController struct {
client.Client
// ConsulClient points at the agent local to the connect-inject deployment pod.
ConsulClient *api.Client
// ConsulClientCfg is the client config used by the ConsulClient when calling NewClient().
ConsulClientCfg *api.Config
// ConsulScheme is the scheme to use when making API calls to Consul,
// i.e. "http" or "https".
ConsulScheme string
Expand Down Expand Up @@ -100,7 +102,7 @@ func (r *EndpointsController) Reconcile(ctx context.Context, req ctrl.Request) (

if hasBeenInjected(pod) {
// Create client for Consul agent local to the pod.
client, err := r.getConsulClient(pod.Status.HostIP)
client, err := r.remoteConsulClient(pod.Status.HostIP)
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.HostIP)
return ctrl.Result{}, err
Expand Down Expand Up @@ -300,7 +302,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(ctx context.Context,
// On each agent, we need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
for _, pod := range list.Items {
// Create client for this agent.
client, err := r.getConsulClient(pod.Status.PodIP)
client, err := r.remoteConsulClient(pod.Status.PodIP)
if err != nil {
r.Log.Error(err, "failed to create a new Consul client", "address", pod.Status.PodIP)
return err
Expand Down Expand Up @@ -410,10 +412,10 @@ func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream,
return upstreams, nil
}

// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (r *EndpointsController) getConsulClient(ip string) (*api.Client, error) {
// remoteConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (r *EndpointsController) remoteConsulClient(ip string) (*api.Client, error) {
newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort)
localConfig := api.DefaultConfig()
localConfig := r.ConsulClientCfg
localConfig.Address = newAddr

return consul.NewClient(localConfig)
Expand Down
242 changes: 169 additions & 73 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"strings"
"testing"

mapset "github.com/deckarep/golang-set"
"github.com/deckarep/golang-set"
logrtest "github.com/go-logr/logr/testing"
"github.com/hashicorp/consul-k8s/subcommand/common"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -106,6 +107,74 @@ func TestHasBeenInjected(t *testing.T) {
}
}

// TestProcessUpstreamsTLSandACLs enables TLS and ACLS and tests processUpstreams through
// the only path which sets up and uses a consul client: when proxy defaults need to be read.
// This test was plucked from the table test TestProcessUpstreams as the rest do not use the client.
func TestProcessUpstreamsTLSandACLs(t *testing.T) {
t.Parallel()
nodeName := "test-node"

masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586"
caFile, certFile, keyFile := common.GenerateServerCerts(t)
// Create test consul server with ACLs and TLS
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.ACL.Enabled = true
c.ACL.DefaultPolicy = "deny"
c.ACL.Tokens.Master = masterToken
c.CAFile = caFile
c.CertFile = certFile
c.KeyFile = keyFile
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForSerfCheck(t)
cfg := &api.Config{
Address: consul.HTTPSAddr,
Scheme: "https",
TLSConfig: api.TLSConfig{
CAFile: caFile,
},
Token: masterToken,
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPSAddr, ":")
consulPort := addr[1]

ce, _ := api.MakeConfigEntry(api.ProxyDefaults, "global")
pd := ce.(*api.ProxyConfigEntry)
pd.MeshGateway.Mode = api.MeshGatewayModeRemote
_, _, err = consulClient.ConfigEntries().Set(pd, &api.WriteOptions{})
require.NoError(t, err)

ep := &EndpointsController{
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "https",
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
}

pod := createPod("pod1", "1.2.3.4", true)
pod.Annotations[annotationUpstreams] = "upstream1:1234:dc1"

upstreams, err := ep.processUpstreams(*pod)
require.NoError(t, err)

expected := []api.Upstream{
{
DestinationType: api.UpstreamDestTypeService,
DestinationName: "upstream1",
Datacenter: "dc1",
LocalBindPort: 1234,
},
}
require.Equal(t, expected, upstreams)
}

func TestProcessUpstreams(t *testing.T) {
t.Parallel()
nodeName := "test-node"
Expand Down Expand Up @@ -617,11 +686,12 @@ func TestReconcileCreateEndpoint(t *testing.T) {
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForLeader(t)
consulClient, err := api.NewClient(&api.Config{

cfg := &api.Config{
Address: consul.HTTPAddr,
})
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]
Expand All @@ -643,6 +713,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
ConsulClientCfg: cfg,
}
namespacedName := types.NamespacedName{
Namespace: "default",
Expand Down Expand Up @@ -704,7 +775,7 @@ func TestReconcileCreateEndpoint(t *testing.T) {
// For the register and deregister codepath, this also tests that they work when the Consul service name is different
// from the K8s service name.
// This test covers EndpointsController.deregisterServiceOnAllAgents when services should be selectively deregistered
// since the map will not be nil.
// since the map will not be nil. This test also runs each test with ACLs+TLS enabled and disabled, since it covers all the cases where a Consul client is created.
func TestReconcileUpdateEndpoint(t *testing.T) {
t.Parallel()
nodeName := "test-node"
Expand Down Expand Up @@ -1201,79 +1272,102 @@ func TestReconcileUpdateEndpoint(t *testing.T) {
expectedProxySvcInstances: []*api.CatalogService{},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
// The agent pod needs to have the address 127.0.0.1 so when the
// code gets the agent pods via the label component=client, and
// makes requests against the agent API, it will actually hit the
// test server we have on localhost.
fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false)
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}
for _, secure := range []bool{true, false} {
for _, tt := range cases {
t.Run(fmt.Sprintf("%s - secure: %v", tt.name, secure), func(t *testing.T) {
// The agent pod needs to have the address 127.0.0.1 so when the
// code gets the agent pods via the label component=client, and
// makes requests against the agent API, it will actually hit the
// test server we have on localhost.
fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false)
fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"}

// Create fake k8s client
k8sObjects := append(tt.k8sObjects(), fakeClientPod)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()
// Create fake k8s client
k8sObjects := append(tt.k8sObjects(), fakeClientPod)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build()

// Create test consul server
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()

consul.WaitForLeader(t)
consulClient, err := api.NewClient(&api.Config{
Address: consul.HTTPAddr,
})
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]
masterToken := "b78d37c7-0ca7-5f4d-99ee-6d9975ce4586"
caFile, certFile, keyFile := common.GenerateServerCerts(t)
// Create test consul server, with ACLs+TLS if necessary
consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) {
if secure {
c.ACL.Enabled = true
c.ACL.DefaultPolicy = "deny"
c.ACL.Tokens.Master = masterToken
c.CAFile = caFile
c.CertFile = certFile
c.KeyFile = keyFile
}
c.NodeName = nodeName
})
require.NoError(t, err)
defer consul.Stop()
consul.WaitForSerfCheck(t)
kschoche marked this conversation as resolved.
Show resolved Hide resolved

// Register service and proxy in consul
for _, svc := range tt.initialConsulSvcs {
err = consulClient.Agent().ServiceRegister(svc)
cfg := &api.Config{
Scheme: "http",
Address: consul.HTTPAddr,
}
if secure {
cfg.Address = consul.HTTPSAddr
cfg.Scheme = "https"
cfg.TLSConfig = api.TLSConfig{
CAFile: caFile,
}
cfg.Token = masterToken
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
}
addr := strings.Split(cfg.Address, ":")
consulPort := addr[1]

// Create the endpoints controller
ep := &EndpointsController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: "http",
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
}
namespacedName := types.NamespacedName{
Namespace: "default",
Name: "service-updated",
}
// Register service and proxy in consul
for _, svc := range tt.initialConsulSvcs {
err = consulClient.Agent().ServiceRegister(svc)
require.NoError(t, err)
}

resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
require.False(t, resp.Requeue)
// Create the endpoints controller
ep := &EndpointsController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
ConsulClient: consulClient,
ConsulPort: consulPort,
ConsulScheme: cfg.Scheme,
AllowK8sNamespacesSet: mapset.NewSetWith("*"),
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
ConsulClientCfg: cfg,
}
namespacedName := types.NamespacedName{
Namespace: "default",
Name: "service-updated",
}

// After reconciliation, Consul should have service-updated with the correct number of instances
serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil)
require.NoError(t, err)
require.Len(t, serviceInstances, tt.expectedNumSvcInstances)
for i, instance := range serviceInstances {
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil)
require.NoError(t, err)
require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances)
for i, instance := range proxyServiceInstances {
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
})
resp, err := ep.Reconcile(context.Background(), ctrl.Request{
NamespacedName: namespacedName,
})
require.NoError(t, err)
require.False(t, resp.Requeue)

// After reconciliation, Consul should have service-updated with the correct number of instances
serviceInstances, _, err := consulClient.Catalog().Service(tt.consulSvcName, "", nil)
require.NoError(t, err)
require.Len(t, serviceInstances, tt.expectedNumSvcInstances)
for i, instance := range serviceInstances {
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedConsulSvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", tt.consulSvcName), "", nil)
require.NoError(t, err)
require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances)
for i, instance := range proxyServiceInstances {
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceID, instance.ServiceID)
require.Equal(t, tt.expectedProxySvcInstances[i].ServiceAddress, instance.ServiceAddress)
}
})
}
}
}

Expand Down Expand Up @@ -1358,9 +1452,10 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
defer consul.Stop()

consul.WaitForLeader(t)
consulClient, err := api.NewClient(&api.Config{
cfg := &api.Config{
Address: consul.HTTPAddr,
})
}
consulClient, err := api.NewClient(cfg)
require.NoError(t, err)
addr := strings.Split(consul.HTTPAddr, ":")
consulPort := addr[1]
Expand All @@ -1382,6 +1477,7 @@ func TestReconcileDeleteEndpoint(t *testing.T) {
DenyK8sNamespacesSet: mapset.NewSetWith(),
ReleaseName: "consul",
ReleaseNamespace: "default",
ConsulClientCfg: cfg,
}

// Set up the Endpoint that will be reconciled, and reconcile
Expand Down
1 change: 1 addition & 0 deletions subcommand/inject-connect/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func (c *Command) Run(args []string) int {
ReleaseName: c.flagReleaseName,
ReleaseNamespace: c.flagReleaseNamespace,
Context: ctx,
ConsulClientCfg: cfg,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", connectinject.EndpointsController{})
return 1
Expand Down