Skip to content

Commit

Permalink
Merge pull request #2806 from wfernandes/workload-cluster-unit-tests
Browse files Browse the repository at this point in the history
🏃Add workload cluster unit tests in KCP
  • Loading branch information
k8s-ci-robot committed Mar 30, 2020
2 parents 3288e3e + d821cd5 commit 3f0710b
Show file tree
Hide file tree
Showing 7 changed files with 1,170 additions and 342 deletions.
4 changes: 3 additions & 1 deletion controlplane/kubeadm/internal/etcd/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ type FakeEtcdClient struct {
MoveLeaderResponse *clientv3.MoveLeaderResponse
StatusResponse *clientv3.StatusResponse
ErrorResponse error
MovedLeader uint64
}

func (c *FakeEtcdClient) Endpoints() []string {
return c.EtcdEndpoints
}

func (c *FakeEtcdClient) MoveLeader(_ context.Context, _ uint64) (*clientv3.MoveLeaderResponse, error) {
func (c *FakeEtcdClient) MoveLeader(_ context.Context, i uint64) (*clientv3.MoveLeaderResponse, error) {
c.MovedLeader = i
return c.MoveLeaderResponse, c.ErrorResponse
}

Expand Down
243 changes: 6 additions & 237 deletions controlplane/kubeadm/internal/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
etcdutil "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd/util"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/certs"
"sigs.k8s.io/cluster-api/util/patch"
Expand All @@ -46,17 +44,14 @@ import (

const (
kubeProxyKey = "kube-proxy"
kubeadmConfigKey = "kubeadm-config"
labelNodeRoleMaster = "node-role.kubernetes.io/master"
)

var (
ErrControlPlaneMinNodes = errors.New("cluster has fewer than 2 control plane nodes; removing an etcd member is not supported")
)

type etcdClientFor interface {
forNode(ctx context.Context, name string) (*etcd.Client, error)
}

// WorkloadCluster defines all behaviors necessary to upgrade kubernetes on a workload cluster
type WorkloadCluster interface {
// Basic health and status checks.
Expand Down Expand Up @@ -153,162 +148,6 @@ func (w *Workload) ControlPlaneIsHealthy(ctx context.Context) (HealthCheckResult
return response, nil
}

// removeMemberForNode removes the etcd member for the node. Removing the etcd
// member when the cluster has one control plane node is not supported. To allow
// the removal of a failed etcd member, the etcd API requests are sent to a
// different node.
func (w *Workload) removeMemberForNode(ctx context.Context, name string) error {
// Pick a different node to talk to etcd
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
return err
}
if len(controlPlaneNodes.Items) < 2 {
return ErrControlPlaneMinNodes
}
anotherNode := firstNodeNotMatchingName(name, controlPlaneNodes.Items)
if anotherNode == nil {
return errors.Errorf("failed to find a control plane node whose name is not %s", name)
}
etcdClient, err := w.etcdClientGenerator.forNode(ctx, anotherNode.Name)
if err != nil {
return errors.Wrap(err, "failed to create etcd client")
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
return errors.Wrap(err, "failed to list etcd members using etcd client")
}
member := etcdutil.MemberForName(members, name)

// The member has already been removed, return immediately
if member == nil {
return nil
}

if err := etcdClient.RemoveMember(ctx, member.ID); err != nil {
return errors.Wrap(err, "failed to remove member from etcd")
}

return nil
}

// EtcdIsHealthy runs checks for every etcd member in the cluster to satisfy our definition of healthy.
// This is a best effort check and nodes can become unhealthy after the check is complete. It is not a guarantee.
// It's used a signal for if we should allow a target cluster to scale up, scale down or upgrade.
// It returns a map of nodes checked along with an error for a given node.
func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error) {
var knownClusterID uint64
var knownMemberIDSet etcdutil.UInt64Set

controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
return nil, err
}

expectedMembers := 0
response := make(map[string]error)
for _, node := range controlPlaneNodes.Items {
name := node.Name
response[name] = nil
if node.Spec.ProviderID == "" {
response[name] = errors.New("empty provider ID")
continue
}

// Check to see if the pod is ready
etcdPodKey := ctrlclient.ObjectKey{
Namespace: metav1.NamespaceSystem,
Name: staticPodName("etcd", name),
}
pod := corev1.Pod{}
if err := w.Client.Get(ctx, etcdPodKey, &pod); err != nil {
response[name] = errors.Wrap(err, "failed to get etcd pod")
continue
}
if err := checkStaticPodReadyCondition(pod); err != nil {
// Nothing wrong here, etcd on this node is just not running.
// If it's a true failure the healthcheck will fail since it won't have checked enough members.
continue
}
// Only expect a member reports healthy if its pod is ready.
// This fixes the known state where the control plane has a crash-looping etcd pod that is not part of the
// etcd cluster.
expectedMembers++

// Create the etcd Client for the etcd Pod scheduled on the Node
etcdClient, err := w.etcdClientGenerator.forNode(ctx, name)
if err != nil {
response[name] = errors.Wrap(err, "failed to create etcd client")
continue
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
response[name] = errors.Wrap(err, "failed to list etcd members using etcd client")
continue
}
member := etcdutil.MemberForName(members, name)

// Check that the member reports no alarms.
if len(member.Alarms) > 0 {
response[name] = errors.Errorf("etcd member reports alarms: %v", member.Alarms)
continue
}

// Check that the member belongs to the same cluster as all other members.
clusterID := member.ClusterID
if knownClusterID == 0 {
knownClusterID = clusterID
} else if knownClusterID != clusterID {
response[name] = errors.Errorf("etcd member has cluster ID %d, but all previously seen etcd members have cluster ID %d", clusterID, knownClusterID)
continue
}

// Check that the member list is stable.
memberIDSet := etcdutil.MemberIDSet(members)
if knownMemberIDSet.Len() == 0 {
knownMemberIDSet = memberIDSet
} else {
unknownMembers := memberIDSet.Difference(knownMemberIDSet)
if unknownMembers.Len() > 0 {
response[name] = errors.Errorf("etcd member reports members IDs %v, but all previously seen etcd members reported member IDs %v", memberIDSet.UnsortedList(), knownMemberIDSet.UnsortedList())
}
continue
}
}

// TODO: ensure that each pod is owned by a node that we're managing. That would ensure there are no out-of-band etcd members

// Check that there is exactly one etcd member for every healthy pod.
// This allows us to handle the expected case where there is a failing pod but it's been removed from the member list.
if expectedMembers != len(knownMemberIDSet) {
return response, errors.Errorf("there are %d healthy etcd pods, but %d etcd members", expectedMembers, len(knownMemberIDSet))
}

return response, nil
}

// UpdateEtcdVersionInKubeadmConfigMap sets the imageRepository or the imageTag or both in the kubeadm config map.
func (w *Workload) UpdateEtcdVersionInKubeadmConfigMap(ctx context.Context, imageRepository, imageTag string) error {
configMapKey := ctrlclient.ObjectKey{Name: "kubeadm-config", Namespace: metav1.NamespaceSystem}
kubeadmConfigMap, err := w.getConfigMap(ctx, configMapKey)
if err != nil {
return err
}
config := &kubeadmConfig{ConfigMap: kubeadmConfigMap}
changed, err := config.UpdateEtcdMeta(imageRepository, imageTag)
if err != nil || !changed {
return err
}
if err := w.Client.Update(ctx, config.ConfigMap); err != nil {
return errors.Wrap(err, "error updating kubeadm ConfigMap")
}
return nil
}

// UpdateKubernetesVersionInKubeadmConfigMap updates the kubernetes version in the kubeadm config map.
func (w *Workload) UpdateImageRepositoryInKubeadmConfigMap(ctx context.Context, imageRepository string) error {
configMapKey := ctrlclient.ObjectKey{Name: "kubeadm-config", Namespace: metav1.NamespaceSystem}
Expand All @@ -328,7 +167,7 @@ func (w *Workload) UpdateImageRepositoryInKubeadmConfigMap(ctx context.Context,

// UpdateKubernetesVersionInKubeadmConfigMap updates the kubernetes version in the kubeadm config map.
func (w *Workload) UpdateKubernetesVersionInKubeadmConfigMap(ctx context.Context, version semver.Version) error {
configMapKey := ctrlclient.ObjectKey{Name: "kubeadm-config", Namespace: metav1.NamespaceSystem}
configMapKey := ctrlclient.ObjectKey{Name: kubeadmConfigKey, Namespace: metav1.NamespaceSystem}
kubeadmConfigMap, err := w.getConfigMap(ctx, configMapKey)
if err != nil {
return err
Expand Down Expand Up @@ -380,24 +219,14 @@ func (w *Workload) UpdateKubeletConfigMap(ctx context.Context, version semver.Ve
return nil
}

// RemoveEtcdMemberForMachine removes the etcd member from the target cluster's etcd cluster.
func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error {
if machine == nil || machine.Status.NodeRef == nil {
// Nothing to do, no node for Machine
return nil
}

return w.removeMemberForNode(ctx, machine.Status.NodeRef.Name)
}

// RemoveMachineFromKubeadmConfigMap removes the entry for the machine from the kubeadm configmap.
func (w *Workload) RemoveMachineFromKubeadmConfigMap(ctx context.Context, machine *clusterv1.Machine) error {
if machine == nil || machine.Status.NodeRef == nil {
// Nothing to do, no node for Machine
return nil
}

configMapKey := ctrlclient.ObjectKey{Name: "kubeadm-config", Namespace: metav1.NamespaceSystem}
configMapKey := ctrlclient.ObjectKey{Name: kubeadmConfigKey, Namespace: metav1.NamespaceSystem}
kubeadmConfigMap, err := w.getConfigMap(ctx, configMapKey)
if err != nil {
return err
Expand Down Expand Up @@ -518,77 +347,17 @@ func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) {
}

// find the kubeadm conifg
kubeadmConfigKey := ctrlclient.ObjectKey{
Name: "kubeadm-config",
key := ctrlclient.ObjectKey{
Name: kubeadmConfigKey,
Namespace: metav1.NamespaceSystem,
}
err = w.Client.Get(ctx, kubeadmConfigKey, &corev1.ConfigMap{})
err = w.Client.Get(ctx, key, &corev1.ConfigMap{})
// TODO: Consider if this should only return false if the error is IsNotFound.
// TODO: Consider adding a third state of 'unknown' when there is an error retrieving the config map.
status.HasKubeadmConfig = err == nil
return status, nil
}

// ForwardEtcdLeadership forwards etcd leadership to the first follower
func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error {
if machine == nil || machine.Status.NodeRef == nil {
// Nothing to do, no node for Machine
return nil
}

// TODO we'd probably prefer to pass in all the known nodes and let grpc handle retrying connections across them
clientMachineName := machine.Status.NodeRef.Name
if leaderCandidate != nil && leaderCandidate.Status.NodeRef != nil {
// connect to the new leader candidate, in case machine's etcd membership has already been removed
clientMachineName = leaderCandidate.Status.NodeRef.Name
}

etcdClient, err := w.etcdClientGenerator.forNode(ctx, clientMachineName)
if err != nil {
return errors.Wrap(err, "failed to create etcd Client")
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
return errors.Wrap(err, "failed to list etcd members using etcd client")
}

currentMember := etcdutil.MemberForName(members, machine.Status.NodeRef.Name)
if currentMember == nil || currentMember.ID != etcdClient.LeaderID {
return nil
}

// Move the etcd client to the current leader, which in this case is the machine we're about to delete.
etcdClient, err = w.etcdClientGenerator.forNode(ctx, machine.Status.NodeRef.Name)
if err != nil {
return errors.Wrap(err, "failed to create etcd Client")
}

// If we don't have a leader candidate, move the leader to the next available machine.
if leaderCandidate == nil || leaderCandidate.Status.NodeRef == nil {
for _, member := range members {
if member.ID != currentMember.ID {
if err := etcdClient.MoveLeader(ctx, member.ID); err != nil {
return errors.Wrapf(err, "failed to move leader")
}
break
}
}
return nil
}

// Move the leader to the provided candidate.
nextLeader := etcdutil.MemberForName(members, leaderCandidate.Status.NodeRef.Name)
if nextLeader == nil {
return errors.Errorf("failed to get etcd member from node %q", leaderCandidate.Status.NodeRef.Name)
}
if err := etcdClient.MoveLeader(ctx, nextLeader.ID); err != nil {
return errors.Wrapf(err, "failed to move leader")
}
return nil
}

func generateClientCert(caCertEncoded, caKeyEncoded []byte) (tls.Certificate, error) {
privKey, err := certs.NewPrivateKey()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion controlplane/kubeadm/internal/workload_cluster_coredns.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (w *Workload) updateCoreDNSDeployment(ctx context.Context, info *coreDNSInf

// UpdateCoreDNSImageInfoInKubeadmConfigMap updates the kubernetes version in the kubeadm config map.
func (w *Workload) updateCoreDNSImageInfoInKubeadmConfigMap(ctx context.Context, dns *kubeadmv1.DNS) error {
configMapKey := ctrlclient.ObjectKey{Name: "kubeadm-config", Namespace: metav1.NamespaceSystem}
configMapKey := ctrlclient.ObjectKey{Name: kubeadmConfigKey, Namespace: metav1.NamespaceSystem}
kubeadmConfigMap, err := w.getConfigMap(ctx, configMapKey)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func TestGetCoreDNSInfo(t *testing.T) {
func TestUpdateCoreDNSImageInfoInKubeadmConfigMap(t *testing.T) {
cm := &corev1.ConfigMap{
ObjectMeta: v1.ObjectMeta{
Name: "kubeadm-config",
Name: kubeadmConfigKey,
Namespace: metav1.NamespaceSystem,
},
Data: map[string]string{
Expand Down Expand Up @@ -543,7 +543,7 @@ scheduler: {}`,
g.Expect(err).ToNot(HaveOccurred())

var expectedConfigMap corev1.ConfigMap
g.Expect(fakeClient.Get(context.TODO(), ctrlclient.ObjectKey{Name: "kubeadm-config", Namespace: metav1.NamespaceSystem}, &expectedConfigMap)).To(Succeed())
g.Expect(fakeClient.Get(context.TODO(), ctrlclient.ObjectKey{Name: kubeadmConfigKey, Namespace: metav1.NamespaceSystem}, &expectedConfigMap)).To(Succeed())
g.Expect(expectedConfigMap.Data).To(HaveKeyWithValue("ClusterConfiguration", ContainSubstring("1.0.1-somever.1")))
g.Expect(expectedConfigMap.Data).To(HaveKeyWithValue("ClusterConfiguration", ContainSubstring("gcr.io/example")))
})
Expand Down
Loading

0 comments on commit 3f0710b

Please sign in to comment.