Skip to content

Commit

Permalink
change deployments to use informers rather than client
Browse files Browse the repository at this point in the history
(Currently the test is passing when it shouldn't be.)
  • Loading branch information
Jeff Peeler committed Oct 10, 2018
1 parent ef655cf commit e88a316
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 54 deletions.
44 changes: 24 additions & 20 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
cappsv1 "k8s.io/client-go/listers/apps/v1"
crbacv1 "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -55,6 +56,7 @@ type Operator struct {
clusterRoleLister crbacv1.ClusterRoleLister
clusterRoleBindingLister crbacv1.ClusterRoleBindingLister
operatorGroupLister map[string]operatorgrouplister.OperatorGroupLister
deploymentLister map[string]cappsv1.DeploymentLister
annotator *annotator.Annotator
cleanupFunc func()
}
Expand Down Expand Up @@ -163,10 +165,13 @@ func NewOperator(crClient versioned.Interface, opClient operatorclient.ClientInt

// set up watch on deployments
depInformers := []cache.SharedIndexInformer{}
op.deploymentLister = make(map[string]cappsv1.DeploymentLister, len(namespaces))
for _, namespace := range namespaces {
log.Debugf("watching deployments in namespace %s", namespace)
informer := informers.NewSharedInformerFactoryWithOptions(opClient.KubernetesInterface(), wakeupInterval, informers.WithNamespace(namespace)).Apps().V1().Deployments().Informer()
depInformers = append(depInformers, informer)
informerFactory := informers.NewSharedInformerFactoryWithOptions(opClient.KubernetesInterface(), wakeupInterval, informers.WithNamespace(namespace))
informer := informerFactory.Apps().V1().Deployments()
depInformers = append(depInformers, informer.Informer())
op.deploymentLister[namespace] = informer.Lister()
}

depQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csv-deployments")
Expand Down Expand Up @@ -321,17 +326,7 @@ func (a *Operator) updateDeploymentAnnotation(op *v1alpha2.OperatorGroup) (error

currentNamespace := op.GetNamespace()
csvsInNamespace := a.csvsInNamespace(currentNamespace)
for csvName, csv := range csvsInNamespace {
strategy, err := a.resolver.UnmarshalStrategy(csv.Spec.InstallStrategy)
if err != nil {
return fmt.Errorf("error unmarshaling strategy from ClusterServiceVersion '%s' with error: %s", csvName, err), namespaceList.Items
}

strategyDetailsDeployment, ok := strategy.(*install.StrategyDetailsDeployment)
if !ok {
return fmt.Errorf("could not assert strategy implementation as deployment for CSV %s", csvName), namespaceList.Items
}

for _, csv := range csvsInNamespace {
managerPolicyRules := []rbacv1.PolicyRule{}
apiEditPolicyRules := []rbacv1.PolicyRule{}
apiViewPolicyRules := []rbacv1.PolicyRule{}
Expand Down Expand Up @@ -383,14 +378,23 @@ func (a *Operator) updateDeploymentAnnotation(op *v1alpha2.OperatorGroup) (error
return err, namespaceList.Items
}

var nsList []string
for ix := range namespaceList.Items {
nsList = append(nsList, namespaceList.Items[ix].Name)
}

var nsList []string
for ix := range namespaceList.Items {
nsList = append(nsList, namespaceList.Items[ix].Name)
}

// write above namespaces to watch in every deployment
for _, ns := range nsList {
//deploymentList, err := a.OpClient.KubernetesInterface().AppsV1().Deployments(ns).List(metav1.ListOptions{})
deploymentList, err := a.deploymentLister[ns].List(labels.Everything())
log.Debugf("JPEELER: looking at ns %v deployments:%v\n", ns, deploymentList)
if err != nil {
return err, namespaceList.Items
}

// write namespaces to watch in every deployment
for i, _ := range strategyDetailsDeployment.DeploymentSpecs {
deploy := &strategyDetailsDeployment.DeploymentSpecs[i]
for _, deploy := range deploymentList {
originalData, err := json.Marshal(deploy)
if err != nil {
return err, namespaceList.Items
Expand All @@ -409,7 +413,7 @@ func (a *Operator) updateDeploymentAnnotation(op *v1alpha2.OperatorGroup) (error
return err, namespaceList.Items
}

_, err = a.OpClient.KubernetesInterface().AppsV1().Deployments(currentNamespace).Patch(deploy.Name, types.StrategicMergePatchType, patchBytes)
_, err = a.OpClient.KubernetesInterface().AppsV1().Deployments(ns).Patch(deploy.Name, types.StrategicMergePatchType, patchBytes)
if err != nil {
return fmt.Errorf("Deployment update for '%v' failed: %v\n", deploy.Name, err), namespaceList.Items
}
Expand Down
86 changes: 52 additions & 34 deletions pkg/controller/operators/olm/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import (
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
k8sfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationfake "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake"

Expand Down Expand Up @@ -986,27 +989,28 @@ func TestSyncOperatorGroups(t *testing.T) {

testNS := "test-ns"
aLabel := map[string]string{"app": "matchLabel"}
newStrategySpecWithAnnotation := func(deploymentName string, namespace string) v1alpha1.NamedInstallStrategy {
deploymentFn := func(deploymentName string) []install.StrategyDeploymentSpec {
deploymentSpecs := getStrategyDeploymentSpecs(deploymentName)
for i := range deploymentSpecs {
metav1.SetMetaDataAnnotation(&deploymentSpecs[i].Spec.Template.ObjectMeta, "olm.targetNamespaces", namespace)
}
return deploymentSpecs
}
return getModifiedInstallStrategy(deploymentName, deploymentFn, nil, nil)
}

// newStrategySpecWithAnnotation := func(deploymentName string, namespace string) v1alpha1.NamedInstallStrategy {
// deploymentFn := func(deploymentName string) []install.StrategyDeploymentSpec {
// deploymentSpecs := getStrategyDeploymentSpecs(deploymentName)
// for i := range deploymentSpecs {
// metav1.SetMetaDataAnnotation(&deploymentSpecs[i].Spec.Template.ObjectMeta, "olm.targetNamespaces", namespace)
// }
// return deploymentSpecs
// }
// return getModifiedInstallStrategy(deploymentName, deploymentFn, nil, nil)
// }

tests := []struct {
name string
initialCsvs []runtime.Object
initialCrds []runtime.Object
initialObjs []runtime.Object
initialApis []runtime.Object
namespaces []v1.Namespace
inputGroup v1alpha2.OperatorGroup
expectedStatus v1alpha2.OperatorGroupStatus
expectedCSVs []v1alpha1.ClusterServiceVersion
name string
initialCsvs []runtime.Object
initialCrds []runtime.Object
initialObjs []runtime.Object
initialApis []runtime.Object
namespaces []v1.Namespace
inputGroup v1alpha2.OperatorGroup
expectedStatus v1alpha2.OperatorGroupStatus
expectedAnnotation map[string]string
}{
{
name: "operator group with no matching namespace, no CSVs",
Expand Down Expand Up @@ -1087,6 +1091,9 @@ func TestSyncOperatorGroups(t *testing.T) {
v1alpha1.CSVPhaseSucceeded,
),
},
initialObjs: []runtime.Object{
deployment("csv1-dep1", testNS),
},
inputGroup: v1alpha2.OperatorGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "operator-group-1",
Expand All @@ -1111,16 +1118,8 @@ func TestSyncOperatorGroups(t *testing.T) {
},
LastUpdated: timeNow(),
},
expectedCSVs: []v1alpha1.ClusterServiceVersion{
*csv("csv1",
testNS,
"",
newStrategySpecWithAnnotation("csv1-dep1", testNS),
[]*v1beta1.CustomResourceDefinition{crd("c1", "v1")},
[]*v1beta1.CustomResourceDefinition{},
v1alpha1.CSVPhaseSucceeded,
),
},
//expectedAnnotation: map[string]string{"olm.targetNamespaces": testNS},
expectedAnnotation: map[string]string{"BROKEN": testNS},
},
}

Expand All @@ -1129,16 +1128,35 @@ func TestSyncOperatorGroups(t *testing.T) {
op, err := NewFakeOperator(tc.initialCsvs, tc.initialObjs, tc.initialCrds, tc.initialApis, &install.StrategyResolver{}, tc.namespaces)
require.NoError(t, err)

stopCh := make(chan struct{})
informerFactory := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), 1*time.Second)
deployInformer := informerFactory.Apps().V1().Deployments()
for _, informer := range []cache.SharedIndexInformer{deployInformer.Informer()} {
go informer.Run(stopCh)
}
informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)

err = op.syncOperatorGroups(&tc.inputGroup)
require.NoError(t, err)
assert.Equal(t, tc.expectedStatus, tc.inputGroup.Status)

outCSVs, err := op.GetClient().OperatorsV1alpha1().ClusterServiceVersions(testNS).List(metav1.ListOptions{})
require.NoError(t, err)

if tc.initialCsvs != nil {
assert.Equal(t, tc.expectedCSVs, outCSVs.Items)
if tc.expectedAnnotation != nil {
// assuming CSVs are in correct namespace
for _, ns := range tc.namespaces {
deployments, err := op.deploymentLister[ns.Name].List(labels.Everything())
//deployments, err := op.OpClient.KubernetesInterface().AppsV1().Deployments(ns.Name).List(metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
t.Logf("JPEELER deployments in test using ns: %v: %v\n", ns.Name, deployments)
for _, deploy := range deployments {
assert.Equal(t, tc.expectedAnnotation, deploy.Spec.Template.Annotations)
}
}
}
close(stopCh)
<-stopCh
})
}
}
Expand Down

0 comments on commit e88a316

Please sign in to comment.