Skip to content

Commit

Permalink
Merge pull request #50 from senthilrch/senthilrch
Browse files Browse the repository at this point in the history
Senthilrch
  • Loading branch information
senthilrch authored Jan 3, 2020
2 parents 7763581 + bd50410 commit 93dae8f
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 48 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ These instructions will help you build _kube-fledged_ from source and deploy it
### Prerequisites

- A functioning kubernetes cluster (v1.7 or above). It could be a simple development cluster like minikube or a large production cluster.
- All master and worker nodes with docker engine installed, and having the ["kubernetes.io/hostname"](https://kubernetes.io/docs/reference/kubernetes-api/labels-annotations-taints/#kubernetes-io-hostname) label.
- All master and worker nodes having the ["kubernetes.io/hostname"](https://kubernetes.io/docs/reference/kubernetes-api/labels-annotations-taints/#kubernetes-io-hostname) label.
- Supported container runtimes: docker, containerd
- make, go, docker and kubectl installed on a local linux machine. kubectl configured properly to access the cluster.

### Build
Expand Down
24 changes: 14 additions & 10 deletions cmd/app/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,10 @@ func (c *Controller) enqueueImageCache(workType images.WorkType, old, new interf
newImageCache := new.(*fledgedv1alpha1.ImageCache)

if oldImageCache.Status.Status == fledgedv1alpha1.ImageCacheActionStatusProcessing {
glog.Errorf("Received image cache update/purge/delete for '%s' while it is under processing, so ignoring.", oldImageCache.Name)
return false
if !reflect.DeepEqual(newImageCache.Spec, oldImageCache.Spec) {
glog.Warningf("Received image cache update/purge/delete for '%s' while it is under processing, so ignoring.", oldImageCache.Name)
return false
}
}
if _, exists := newImageCache.Annotations[imageCachePurgeAnnotationKey]; exists {
if _, exists := oldImageCache.Annotations[imageCachePurgeAnnotationKey]; !exists {
Expand Down Expand Up @@ -556,10 +558,11 @@ func (c *Controller) syncHandler(wqKey images.WorkQueueKey) error {
for _, n := range nodes {
for m := range i.Images {
ipr := images.ImageWorkRequest{
Image: i.Images[m],
Node: n.Labels["kubernetes.io/hostname"],
WorkType: wqKey.WorkType,
Imagecache: imageCache,
Image: i.Images[m],
Node: n.Labels["kubernetes.io/hostname"],
ContainerRuntimeVersion: n.Status.NodeInfo.ContainerRuntimeVersion,
WorkType: wqKey.WorkType,
Imagecache: imageCache,
}
c.imageworkqueue.AddRateLimited(ipr)
}
Expand All @@ -574,10 +577,11 @@ func (c *Controller) syncHandler(wqKey images.WorkQueueKey) error {
}
if !matched {
ipr := images.ImageWorkRequest{
Image: oldimage,
Node: n.Labels["kubernetes.io/hostname"],
WorkType: images.ImageCachePurge,
Imagecache: imageCache,
Image: oldimage,
Node: n.Labels["kubernetes.io/hostname"],
ContainerRuntimeVersion: n.Status.NodeInfo.ContainerRuntimeVersion,
WorkType: images.ImageCachePurge,
Imagecache: imageCache,
}
c.imageworkqueue.AddRateLimited(ipr)
}
Expand Down
100 changes: 73 additions & 27 deletions pkg/images/image_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func newImagePullJob(imagecache *fledgedv1alpha1.ImageCache, image string, hostn
}

// newImageDeleteJob constructs a job manifest to delete an image from a node
func newImageDeleteJob(imagecache *fledgedv1alpha1.ImageCache, image string, hostname string, dockerclientimage string) (*batchv1.Job, error) {
func newImageDeleteJob(imagecache *fledgedv1alpha1.ImageCache, image string, hostname string, containerRuntimeVersion string, dockerclientimage string) (*batchv1.Job, error) {
if imagecache == nil {
glog.Error("imagecache pointer is nil")
return nil, fmt.Errorf("imagecache pointer is nil")
Expand All @@ -146,6 +146,76 @@ func newImageDeleteJob(imagecache *fledgedv1alpha1.ImageCache, image string, hos
backoffLimit := int32(0)
activeDeadlineSeconds := int64((time.Hour).Seconds())

var containerRuntime string
if strings.Contains(containerRuntimeVersion, "docker") {
containerRuntime = "docker"
}
if strings.Contains(containerRuntimeVersion, "containerd") {
containerRuntime = "containerd"
}

containerSpec := map[string]struct {
Containers []corev1.Container
Volumes []corev1.Volume
}{
"docker": {
Containers: []corev1.Container{
{
Name: "docker-client",
Image: dockerclientimage,
Command: []string{"/bin/bash"},
Args: []string{"-c", "exec /usr/bin/docker image rm -f " + image + " > /dev/termination-log 2>&1"},
VolumeMounts: []corev1.VolumeMount{
{
Name: "docker-sock",
MountPath: "/var/run/docker.sock",
},
},
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
Volumes: []corev1.Volume{
{
Name: "docker-sock",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/run/docker.sock",
Type: &hostpathtype,
},
},
},
},
},
"containerd": {
Containers: []corev1.Container{
{
Name: "crictl-client",
Image: dockerclientimage,
Command: []string{"/bin/bash"},
Args: []string{"-c", "exec /usr/bin/crictl --runtime-endpoint=unix:///run/containerd/containerd.sock --image-endpoint=unix:///run/containerd/containerd.sock rmi " + image + " > /dev/termination-log 2>&1"},
VolumeMounts: []corev1.VolumeMount{
{
Name: "containerd-sock",
MountPath: "/run/containerd/containerd.sock",
},
},
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
Volumes: []corev1.Volume{
{
Name: "containerd-sock",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/run/containerd/containerd.sock",
Type: &hostpathtype,
},
},
},
},
},
}

job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: imagecache.Name + "-",
Expand All @@ -171,32 +241,8 @@ func newImageDeleteJob(imagecache *fledgedv1alpha1.ImageCache, image string, hos
NodeSelector: map[string]string{
"kubernetes.io/hostname": hostname,
},
Containers: []corev1.Container{
{
Name: "docker-client",
Image: dockerclientimage,
Command: []string{"/bin/bash"},
Args: []string{"-c", "exec /usr/bin/docker image rm -f " + image + " > /dev/termination-log 2>&1"},
VolumeMounts: []corev1.VolumeMount{
{
Name: "docker-sock",
MountPath: "/var/run/docker.sock",
},
},
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
Volumes: []corev1.Volume{
{
Name: "docker-sock",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/run/docker.sock",
Type: &hostpathtype,
},
},
},
},
Containers: containerSpec[containerRuntime].Containers,
Volumes: containerSpec[containerRuntime].Volumes,
RestartPolicy: corev1.RestartPolicyNever,
ImagePullSecrets: imagecache.Spec.ImagePullSecrets,
Tolerations: []corev1.Toleration{
Expand Down
21 changes: 11 additions & 10 deletions pkg/images/image_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ type ImageManager struct {

// ImageWorkRequest has image name, node name, work type and imagecache
type ImageWorkRequest struct {
Image string
Node string
WorkType WorkType
Imagecache *fledgedv1alpha1.ImageCache
Image string
Node string
ContainerRuntimeVersion string
WorkType WorkType
Imagecache *fledgedv1alpha1.ImageCache
}

// ImageWorkResult stores the result of pulling and deleting image
Expand Down Expand Up @@ -166,9 +167,9 @@ func (m *ImageManager) handlePodStatusChange(pod *corev1.Pod) {
if pod.Status.Phase == corev1.PodSucceeded {
iwres.Status = ImageWorkResultStatusSucceeded
if iwres.ImageWorkRequest.WorkType == ImageCachePurge {
glog.Infof("Job %s succeeded (delete: %s --> %s)", pod.Labels["job-name"], iwres.ImageWorkRequest.Image, iwres.ImageWorkRequest.Node)
glog.Infof("Job %s succeeded (delete:- %s --> %s, runtime: %s)", pod.Labels["job-name"], iwres.ImageWorkRequest.Image, iwres.ImageWorkRequest.Node, iwres.ImageWorkRequest.ContainerRuntimeVersion)
} else {
glog.Infof("Job %s succeeded (pull: %s --> %s)", pod.Labels["job-name"], iwres.ImageWorkRequest.Image, iwres.ImageWorkRequest.Node)
glog.Infof("Job %s succeeded (pull:- %s --> %s, runtime: %s)", pod.Labels["job-name"], iwres.ImageWorkRequest.Image, iwres.ImageWorkRequest.Node, iwres.ImageWorkRequest.ContainerRuntimeVersion)
}
}
if pod.Status.Phase == corev1.PodFailed {
Expand Down Expand Up @@ -397,13 +398,13 @@ func (m *ImageManager) processNextWorkItem() bool {
if err != nil {
return fmt.Errorf("error deleting image '%s' from node '%s': %s", iwr.Image, iwr.Node, err.Error())
}
glog.Infof("Job %s created (delete: %s --> %s)", job.Name, iwr.Image, iwr.Node)
glog.Infof("Job %s created (delete:- %s --> %s, runtime: %s)", job.Name, iwr.Image, iwr.Node, iwr.ContainerRuntimeVersion)
} else {
job, err = m.pullImage(iwr)
if err != nil {
return fmt.Errorf("error pulling image '%s' to node '%s': %s", iwr.Image, iwr.Node, err.Error())
}
glog.Infof("Job %s created (pull: %s --> %s)", job.Name, iwr.Image, iwr.Node)
glog.Infof("Job %s created (pull:- %s --> %s, runtime: %s)", job.Name, iwr.Image, iwr.Node, iwr.ContainerRuntimeVersion)
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
Expand Down Expand Up @@ -442,12 +443,12 @@ func (m *ImageManager) pullImage(iwr ImageWorkRequest) (*batchv1.Job, error) {
// deleteImage deletes the image from the node
func (m *ImageManager) deleteImage(iwr ImageWorkRequest) (*batchv1.Job, error) {
// Construct the Job manifest
newjob, err := newImageDeleteJob(iwr.Imagecache, iwr.Image, iwr.Node, m.dockerClientImage)
newjob, err := newImageDeleteJob(iwr.Imagecache, iwr.Image, iwr.Node, iwr.ContainerRuntimeVersion, m.dockerClientImage)
if err != nil {
glog.Errorf("Error when constructing job manifest: %v", err)
return nil, err
}
// Create a Job to pull the image into the node
// Create a Job to delete the image from the node
job, err := m.kubeclientset.BatchV1().Jobs(fledgedNameSpace).Create(newjob)
if err != nil {
glog.Errorf("Error creating job in node %s: %v", iwr.Node, err)
Expand Down

0 comments on commit 93dae8f

Please sign in to comment.