Skip to content

Commit

Permalink
handling image change events
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Minář <miminar@redhat.com>
  • Loading branch information
Michal Minář committed Apr 26, 2018
1 parent f6444e5 commit ca5dfd3
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 38 deletions.
54 changes: 49 additions & 5 deletions pkg/oc/admin/prune/imageprune/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,13 @@ type PrunerOptions struct {
// Images is the entire list of images in OpenShift. An image must be in this
// list to be a candidate for pruning.
Images *imageapi.ImageList
// ImageWatcher watches for image changes.
ImageWatcher watch.Interface
// Streams is the entire list of image streams across all namespaces in the
// cluster.
Streams *imageapi.ImageStreamList
// StreamsWatcher watches for stream changes.
StreamsWatcher watch.Interface
// StreamWatcher watches for stream changes.
StreamWatcher watch.Interface
// Pods is the entire list of pods across all namespaces in the cluster.
Pods *kapi.PodList
// RCs is the entire list of replication controllers across all namespaces in
Expand Down Expand Up @@ -205,6 +207,7 @@ type pruner struct {
algorithm pruneAlgorithm
registryClientFactory RegistryClientFactoryFunc
registryURL *url.URL
imageWatcher watch.Interface
imageStreamWatcher watch.Interface
imageStreamLimits map[string][]*kapi.LimitRange
// sorted queue of images to prune
Expand Down Expand Up @@ -292,7 +295,8 @@ func NewPruner(options PrunerOptions) (Pruner, kerrors.Aggregate) {
registryClientFactory: options.RegistryClientFactory,
registryURL: options.RegistryURL,
processedImages: make(map[*imagegraph.ImageNode]*Job),
imageStreamWatcher: options.StreamsWatcher,
imageWatcher: options.ImageWatcher,
imageStreamWatcher: options.StreamWatcher,
imageStreamLimits: options.LimitRanges,
numWorkers: options.NumWorkers,
}
Expand Down Expand Up @@ -828,14 +832,52 @@ func (p *pruner) handleImageStreamEvent(event watch.Event) {
if isNode != nil {
glog.V(4).Infof("Removing updated ImageStream %s from the graph", getName(is))
// first remove the current node if present
p.g.RemoveNode(imagegraph.EnsureImageStreamNode(p.g, is))
p.g.RemoveNode(isNode)
}

glog.V(4).Infof("Adding updated ImageStream %s back to the graph", getName(is))
p.addImageStreamsToGraph(&imageapi.ImageStreamList{Items: []imageapi.ImageStream{*is}}, p.imageStreamLimits)
}
}

func (p *pruner) handleImageEvent(event watch.Event) {
getImageNode := func() (*imageapi.Image, *imagegraph.ImageNode) {
img, ok := event.Object.(*imageapi.Image)
if !ok {
utilruntime.HandleError(fmt.Errorf("internal error: expected Image object in %s event, not %T", event.Type, event.Object))
return nil, nil
}
n := p.g.Find(imagegraph.ImageNodeName(img))
if imgNode, ok := n.(*imagegraph.ImageNode); ok {
return img, imgNode
}
return img, nil
}

switch event.Type {
case watch.Added:
img, imgNode := getImageNode()
if img == nil {
return
}
if imgNode != nil {
glog.V(4).Infof("Ignoring added Image %s that is already present in the graph", img)
return
}
glog.V(4).Infof("Adding new Image %s to the graph", img.Name)
p.addImagesToGraph(&imageapi.ImageList{Items: []imageapi.Image{*img}})

case watch.Deleted:
img, imgNode := getImageNode()
if imgNode == nil {
glog.V(4).Infof("Ignoring event for deleted Image %s that is not present in the graph", img.Name)
return
}
glog.V(4).Infof("Removing deleted image %s from the graph", img.Name)
p.g.RemoveNode(imgNode)
}
}

// getImageNodes returns only nodes of type ImageNode.
func getImageNodes(nodes []gonum.Node) map[string]*imagegraph.ImageNode {
ret := make(map[string]*imagegraph.ImageNode)
Expand Down Expand Up @@ -1205,6 +1247,7 @@ func (p *pruner) runLoop(
jobChan chan<- *Job,
resultChan <-chan JobResult,
) (deletions []Deletion, failures []Failure) {
imgUpdateChan := p.imageWatcher.ResultChan()
isUpdateChan := p.imageStreamWatcher.ResultChan()
for {
// make workers busy
Expand Down Expand Up @@ -1235,7 +1278,8 @@ func (p *pruner) runLoop(
delete(p.processedImages, res.Job.Image)
case event := <-isUpdateChan:
p.handleImageStreamEvent(event)
// TODO: handle new images - do not add them to the queue though
case event := <-imgUpdateChan:
p.handleImageEvent(event)
}
}
}
Expand Down
58 changes: 31 additions & 27 deletions pkg/oc/admin/prune/imageprune/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,8 +950,9 @@ func TestImagePruning(t *testing.T) {
Namespace: test.namespace,
AllImages: test.allImages,
Images: &test.images,
ImageWatcher: watch.NewFake(),
Streams: &test.streams,
StreamsWatcher: watch.NewFake(),
StreamWatcher: watch.NewFake(),
Pods: &test.pods,
RCs: &test.rcs,
BCs: &test.bcs,
Expand Down Expand Up @@ -1318,8 +1319,9 @@ func TestRegistryPruning(t *testing.T) {
KeepTagRevisions: &keepTagRevisions,
PruneRegistry: &test.pruneRegistry,
Images: &test.images,
ImageWatcher: watch.NewFake(),
Streams: &test.streams,
StreamsWatcher: watch.NewFake(),
StreamWatcher: watch.NewFake(),
Pods: &kapi.PodList{},
RCs: &kapi.ReplicationControllerList{},
BCs: &buildapi.BuildConfigList{},
Expand Down Expand Up @@ -1393,17 +1395,18 @@ func TestImageWithStrongAndWeakRefsIsNotPruned(t *testing.T) {
rss := testutil.RSList()

options := PrunerOptions{
Images: &images,
Streams: &streams,
StreamsWatcher: watch.NewFake(),
Pods: &pods,
RCs: &rcs,
BCs: &bcs,
Builds: &builds,
DSs: &dss,
Deployments: &deployments,
DCs: &dcs,
RSs: &rss,
Images: &images,
ImageWatcher: watch.NewFake(),
Streams: &streams,
StreamWatcher: watch.NewFake(),
Pods: &pods,
RCs: &rcs,
BCs: &bcs,
Builds: &builds,
DSs: &dss,
Deployments: &deployments,
DCs: &dcs,
RSs: &rss,
}
keepYoungerThan := 24 * time.Hour
keepTagRevisions := 2
Expand Down Expand Up @@ -1662,7 +1665,7 @@ func TestChangeImageStreamsWhilePruning(t *testing.T) {
)

streams := testutil.StreamList(testutil.Stream("registry1", "foo", "bar", testutil.Tags()))
streamsWatcher := watch.NewFake()
streamWatcher := watch.NewFake()
pods := testutil.PodList()
rcs := testutil.RCList()
bcs := testutil.BCList()
Expand All @@ -1673,17 +1676,18 @@ func TestChangeImageStreamsWhilePruning(t *testing.T) {
rss := testutil.RSList()

options := PrunerOptions{
Images: &images,
Streams: &streams,
StreamsWatcher: streamsWatcher,
Pods: &pods,
RCs: &rcs,
BCs: &bcs,
Builds: &builds,
DSs: &dss,
Deployments: &deployments,
DCs: &dcs,
RSs: &rss,
Images: &images,
ImageWatcher: watch.NewFake(),
Streams: &streams,
StreamWatcher: streamWatcher,
Pods: &pods,
RCs: &rcs,
BCs: &bcs,
Builds: &builds,
DSs: &dss,
Deployments: &deployments,
DCs: &dcs,
RSs: &rss,
RegistryClientFactory: FakeRegistryClientFactory,
RegistryURL: &url.URL{Scheme: "https", Host: "registry1.io"},
NumWorkers: 1,
Expand Down Expand Up @@ -1731,7 +1735,7 @@ func TestChangeImageStreamsWhilePruning(t *testing.T) {
testutil.Tag("latest",
testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000002", "registry1/foo/new@sha256:0000000000000000000000000000000000000000000000000000000000000002"),
)))
streamsWatcher.Add(&stream)
streamWatcher.Add(&stream)
imageDeleter.unblock()

// the pruner shall skip the newly referenced image
Expand All @@ -1748,7 +1752,7 @@ func TestChangeImageStreamsWhilePruning(t *testing.T) {
testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000000", "registry1/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000"),
testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000004", "registry1/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"),
)))
streamsWatcher.Modify(&stream)
streamWatcher.Modify(&stream)
imageDeleter.unblock()

// the pruner shall skip the newly referenced image
Expand Down
19 changes: 13 additions & 6 deletions pkg/oc/admin/prune/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,6 @@ func (o PruneImagesOptions) Validate() error {

// Run contains all the necessary functionality for the OpenShift cli prune images command.
func (o PruneImagesOptions) Run() error {
allImages, err := o.ImageClient.Images().List(metav1.ListOptions{})
if err != nil {
return err
}

allPods, err := o.KubeClient.Core().Pods(o.Namespace).List(metav1.ListOptions{})
if err != nil {
return err
Expand Down Expand Up @@ -327,6 +322,17 @@ func (o PruneImagesOptions) Run() error {
limitRangesMap[limit.Namespace] = limits
}

allImages, err := o.ImageClient.Images().List(metav1.ListOptions{})
if err != nil {
return err
}
imageWatcher, err := o.Imageclient.Images().Watch(metav1.ListOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("internal error: failed to watch for images: %v"+
"\n - image changes will not be detected", err))
imageWatcher = watch.NewFake()
}

imageStreamWatcher, err := o.ImageClient.ImageStreams(o.Namespace).Watch(metav1.ListOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("internal error: failed to watch for image streams: %v"+
Expand Down Expand Up @@ -393,8 +399,9 @@ func (o PruneImagesOptions) Run() error {
PruneOverSizeLimit: o.PruneOverSizeLimit,
AllImages: o.AllImages,
Images: allImages,
ImageWatcher: imageWatcher,
Streams: allStreams,
StreamsWatcher: imageStreamWatcher,
StreamWatcher: imageStreamWatcher,
Pods: allPods,
RCs: allRCs,
BCs: allBCs,
Expand Down

0 comments on commit ca5dfd3

Please sign in to comment.