Skip to content
This repository has been archived by the owner on Oct 15, 2020. It is now read-only.

Commit

Permalink
Merge pull request #52 from errm/containerd
Browse files Browse the repository at this point in the history
Adds support for containerd (cri) runtime
  • Loading branch information
errm committed Apr 9, 2019
2 parents 5350109 + 9b88d65 commit 19049eb
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 38 deletions.
5 changes: 1 addition & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
linters-settings:
lll:
line-length: 141

linters:
enable-all: true
disable:
- prealloc
- gochecknoglobals
- dupl
- lll

service:
golangci-lint-version: 1.15.x # use the fixed version to not introduce new linters unexpectedly
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAG
golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852/go.mod h1:JLpeXjPJfIyPr5TlbXLkXWLhP8nz10XfvxElABhCtcw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180816055513-1c9583448a9c/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
10 changes: 6 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ func region() *string {
}

func main() {
instance, err := node.New(ec2.New(sess), metadata, region())
systemdDbus, err := dbus.New()
check(err)

cluster, err := eks.Cluster(eksSvc.New(sess), instance.ClusterName())
systemd := &system.Systemd{Conn: systemdDbus}
containerRuntime, err := systemd.ContainerRuntime()
check(err)

systemdDbus, err := dbus.New()
instance, err := node.New(ec2.New(sess), metadata, region(), containerRuntime)
check(err)

systemd := &system.Systemd{Conn: systemdDbus}
cluster, err := eks.Cluster(eksSvc.New(sess), instance.ClusterName())
check(err)

system := system.System{
Filesystem: &file.Atomic{},
Expand Down
10 changes: 6 additions & 4 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
// Node represents and EC2 instance.
type Node struct {
*ec2.Instance
Region string
Region string
ContainerRuntime string
}

type metadataClient interface {
Expand All @@ -45,7 +46,7 @@ var b = backoff.Backoff{Seq: []int{1, 1, 2}}
//
// If the EC2 instance doesn't have the expected kubernetes tag, it will backoff and retry.
// If it isn't able to query EC2 or there are any other errors, an error will be returned.
func New(e ec2iface.EC2API, m metadataClient, region *string) (*Node, error) {
func New(e ec2iface.EC2API, m metadataClient, region *string, containerRuntime string) (*Node, error) {
id, err := instanceID(m)
if err != nil {
return nil, err
Expand All @@ -58,8 +59,9 @@ func New(e ec2iface.EC2API, m metadataClient, region *string) (*Node, error) {
}
instance := output.Reservations[0].Instances[0]
node := Node{
Instance: instance,
Region: *region,
Instance: instance,
Region: *region,
ContainerRuntime: containerRuntime,
}
if node.ClusterName() == "" {
sleepFor := b.Duration(tries)
Expand Down
22 changes: 11 additions & 11 deletions pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestNewNode(t *testing.T) {
"instance-id": "1234",
},
}
node, err := New(e, metadata, &use1)
node, err := New(e, metadata, &use1, "docker")
if err != nil {
t.Errorf("unexpected error: %s", err)
}
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestNodeLabels(t *testing.T) {
"instance-id": "1234",
},
}
node, err := New(e, metadata, &use1)
node, err := New(e, metadata, &use1, "docker")
if err != nil {
t.Errorf("unexpected error: %s", err)
}
Expand All @@ -110,7 +110,7 @@ func TestNodeLabels(t *testing.T) {
},
instanceLifecycle: ec2.InstanceLifecycleTypeSpot,
}
node, err = New(e, metadata, &use1)
node, err = New(e, metadata, &use1, "docker")
if err != nil {
t.Errorf("unexpected error: %s", err)
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestNodeTaints(t *testing.T) {
"instance-id": "1234",
},
}
node, err := New(e, metadata, &use1)
node, err := New(e, metadata, &use1, "docker")
if err != nil {
t.Errorf("unexpected error: %s", err)
}
Expand All @@ -165,7 +165,7 @@ func TestClusterDNS(t *testing.T) {
{tag("kubernetes.io/cluster/cluster-name", "owned")},
},
}
node, err := New(e, mockMetadata{}, &use1)
node, err := New(e, mockMetadata{}, &use1, "docker")

if err != nil {
t.Errorf("unexpected error: %s", err)
Expand All @@ -181,7 +181,7 @@ func TestClusterDNS(t *testing.T) {
{tag("kubernetes.io/cluster/cluster-name", "owned")},
},
}
node, err = New(e, mockMetadata{}, &use1)
node, err = New(e, mockMetadata{}, &use1, "docker")

if err != nil {
t.Errorf("unexpected error: %s", err)
Expand All @@ -199,7 +199,7 @@ func TestNewErrors(t *testing.T) {
e := &mockEC2{err: ec2Error}
metadata := mockMetadata{err: metadataError}

_, err := New(e, metadata, &use1)
_, err := New(e, metadata, &use1, "docker")
if err != metadataError {
t.Errorf("expected error: %s to be %s", err, metadataError)
}
Expand All @@ -210,7 +210,7 @@ func TestNewErrors(t *testing.T) {
},
}

_, err = New(e, metadata, &use1)
_, err = New(e, metadata, &use1, "docker")
if err != ec2Error {
t.Errorf("expected error: %s to be %s", err, ec2Error)
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestMaxPods(t *testing.T) {
"instance-id": "1234",
},
}
node, err := New(e, metadata, &usw2)
node, err := New(e, metadata, &usw2, "docker")
if err != nil {
t.Errorf("unexpected error: %s", err)
}
Expand Down Expand Up @@ -369,7 +369,7 @@ func TestReservedCPU(t *testing.T) {
"instance-id": "1234",
},
}
node, err := New(e, metadata, &usw2)
node, err := New(e, metadata, &usw2, "docker")
if err != nil {
t.Errorf("unexpected error: %s", err)
}
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestMemory(t *testing.T) {
"instance-id": "1234",
},
}
node, err := New(e, metadata, &usw2)
node, err := New(e, metadata, &usw2, "docker")
if err != nil {
t.Errorf("unexpected error: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s System) Configure(n *node.Node, cluster *eks.Cluster) error {

func (s System) configs() ([]config, error) {
configs := []config{}
box := packr.NewBox("./templates")
box := packr.New("system templates", "./templates")
err := box.Walk(func(path string, f packr.File) error {
template, err := template.New(path).Funcs(template.FuncMap{"b64dec": base64decode}).Parse(f.String())
configs = append(configs, config{
Expand Down
68 changes: 58 additions & 10 deletions pkg/system/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestConfigure(t *testing.T) {
hn := &FakeHostname{}
init := &FakeInit{}

i := instance(map[string]string{}, false)
i := instance(map[string]string{}, false, "docker")
c := cluster()
system := System{Filesystem: fs, Hostname: hn, Init: init}
err := system.Configure(i, c)
Expand All @@ -42,8 +42,8 @@ func TestConfigure(t *testing.T) {
t.Errorf("unexpected error %v", err)
}

if len(fs.files) != 7 {
t.Errorf("expected 7 files, got %v", len(fs.files))
if len(fs.files) != 8 {
t.Errorf("expected 8 files, got %v", len(fs.files))
}

expected := `apiVersion: v1
Expand Down Expand Up @@ -83,9 +83,8 @@ ExecStart=/usr/bin/kubelet \
--allow-privileged=true \
--cloud-provider=aws \
--config /etc/kubernetes/kubelet/config.yaml \
--container-runtime=docker \
--network-plugin=cni \
--kubeconfig=/var/lib/kubelet/kubeconfig $KUBELET_ARGS $KUBELET_NODE_LABELS $KUBELET_NODE_TAINTS $KUBELET_EXTRA_ARGS
--kubeconfig=/var/lib/kubelet/kubeconfig $KUBELET_CONTAINER_RUNTIME_ARGS $KUBELET_ARGS $KUBELET_NODE_LABELS $KUBELET_NODE_TAINTS $KUBELET_EXTRA_ARGS
Restart=always
StartLimitInterval=0
Expand Down Expand Up @@ -142,6 +141,11 @@ Environment='KUBELET_NODE_LABELS=--node-labels="node-role.kubernetes.io/worker=t
expected = `[Service]`
fs.Check(t, "/etc/systemd/system/kubelet.service.d/30-taints.conf", expected, 0640)

expected = `[Service]
Environment="KUBELET_CONTAINER_RUNTIME_ARGS=--container-runtime=docker"
`
fs.Check(t, "/etc/systemd/system/kubelet.service.d/40-container-runtime.conf", expected, 0640)

expected = `thisisthecertdata
`
fs.Check(t, "/etc/kubernetes/pki/ca.crt", expected, 0640)
Expand All @@ -168,7 +172,7 @@ func TestConfigureSpotInstanceLabels(t *testing.T) {
"node-role.kubernetes.io/worker": "true",
}

i := instance(tags, true)
i := instance(tags, true, "docker")
c := cluster()
system := System{Filesystem: fs, Hostname: hn, Init: init}
err := system.Configure(i, c)
Expand All @@ -192,7 +196,7 @@ func TestConfigureLabels(t *testing.T) {
"k8s.io/cluster-autoscaler/node-template/label/gpu-type": "K80",
}

i := instance(tags, false)
i := instance(tags, false, "docker")
c := cluster()
system := System{Filesystem: fs, Hostname: hn, Init: init}
err := system.Configure(i, c)
Expand All @@ -216,7 +220,7 @@ func TestConfigureTaints(t *testing.T) {
"k8s.io/cluster-autoscaler/node-template/taint/node-role.kubernetes.io/worker": "true:PreferNoSchedule",
}

i := instance(tags, false)
i := instance(tags, false, "docker")
c := cluster()
system := System{Filesystem: fs, Hostname: hn, Init: init}
err := system.Configure(i, c)
Expand All @@ -231,7 +235,50 @@ Environment='KUBELET_NODE_TAINTS=--register-with-taints="node-role.kubernetes.io
fs.Check(t, "/etc/systemd/system/kubelet.service.d/30-taints.conf", expected, 0640)
}

func instance(tags map[string]string, spot bool) *node.Node {
func TestContainerd(t *testing.T) {
fs := &FakeFileSystem{}
hn := &FakeHostname{}
init := &FakeInit{}

i := instance(map[string]string{}, false, "containerd")
c := cluster()
system := System{Filesystem: fs, Hostname: hn, Init: init}
err := system.Configure(i, c)

if err != nil {
t.Errorf("unexpected error %v", err)
}

expected := `[Unit]
Description=kubelet: The Kubernetes Node Agent
Documentation=http://kubernetes.io/docs/
After=containerd.service
Requires=containerd.service
[Service]
ExecStart=/usr/bin/kubelet \
--allow-privileged=true \
--cloud-provider=aws \
--config /etc/kubernetes/kubelet/config.yaml \
--network-plugin=cni \
--kubeconfig=/var/lib/kubelet/kubeconfig $KUBELET_CONTAINER_RUNTIME_ARGS $KUBELET_ARGS $KUBELET_NODE_LABELS $KUBELET_NODE_TAINTS $KUBELET_EXTRA_ARGS
Restart=always
StartLimitInterval=0
RestartSec=5
[Install]
WantedBy=multi-user.target
`
fs.Check(t, "/etc/systemd/system/kubelet.service", expected, 0640)

expected = `[Service]
Environment="KUBELET_CONTAINER_RUNTIME_ARGS=--container-runtime=remote --runtime-request-timeout=15m --container-runtime-endpoint=unix:///run/containerd/containerd.sock --cgroup-driver=systemd"
`
fs.Check(t, "/etc/systemd/system/kubelet.service.d/40-container-runtime.conf", expected, 0640)
}

func instance(tags map[string]string, spot bool, runtime string) *node.Node {
ip := "10.6.28.199"
dnsName := "ip-10-6-28-199.us-west-2.compute.internal"
var ec2tags []*ec2.Tag
Expand All @@ -257,7 +304,8 @@ func instance(tags map[string]string, spot bool) *node.Node {
InstanceType: &instanceType,
InstanceLifecycle: instanceLifecycle,
},
Region: "us-east-1",
Region: "us-east-1",
ContainerRuntime: runtime,
}
}

Expand Down
19 changes: 19 additions & 0 deletions pkg/system/systemd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package system

import (
"errors"
"log"
"os"
"os/exec"
Expand All @@ -28,6 +29,7 @@ type dbusConn interface {
Reload() error
EnableUnitFiles([]string, bool, bool) (bool, []dbus.EnableUnitFileChange, error)
RestartUnit(string, string, chan<- string) (int, error)
ListUnits() ([]dbus.UnitStatus, error)
}

// Systemd allows you to interact with the systemd init system.
Expand Down Expand Up @@ -55,3 +57,20 @@ func (s *Systemd) SetHostname(hostname string) error {
log.Printf("setting hostname to %s", hostname)
return exec.Command("hostnamectl", "set-hostname", hostname).Run()
}

func (s *Systemd) ContainerRuntime() (string, error) {
candidates := map[string]string{
"containerd.service": "containerd",
"docker.service": "docker",
}
units, err := s.Conn.ListUnits()
if err != nil {
return "", err
}
for _, unit := range units {
if value, ok := candidates[unit.Name]; ok && unit.LoadState == "loaded" {
return value, nil
}
}
return "", errors.New("couldn't work out what container runtime is installed")
}
Loading

0 comments on commit 19049eb

Please sign in to comment.