diff --git a/cmd/argocd-application-controller/main.go b/cmd/argocd-application-controller/main.go index f351c5f1a7fa5..00794d1a24a84 100644 --- a/cmd/argocd-application-controller/main.go +++ b/cmd/argocd-application-controller/main.go @@ -41,16 +41,19 @@ func newCommand() *cobra.Command { appClient := appclientset.NewForConfigOrDie(kubeConfig) // TODO (amatyushentsev): Use config map to store controller configuration - namespace := "default" + config := controller.ApplicationControllerConfig{ + Namespace: "default", + InstanceID: "", + } appResyncPeriod := time.Minute * 10 appManager := application.NewAppManager( nativeGitClient, - repository.NewServer(namespace, kubeClient, appClient), + repository.NewServer(config.Namespace, kubeClient, appClient), application.NewKsonnetAppComparator(), appResyncPeriod) - appController := controller.NewApplicationController(kubeClient, appClient, appManager, appResyncPeriod) + appController := controller.NewApplicationController(kubeClient, appClient, appManager, appResyncPeriod, &config) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/common/common.go b/common/common.go index 074c22116e1f7..5fabbb2b36bb8 100644 --- a/common/common.go +++ b/common/common.go @@ -1,5 +1,7 @@ package common +import "github.com/argoproj/argo-cd/pkg/apis/application" + const ( // MetadataPrefix is the prefix used for our labels and annotations MetadataPrefix = "argocd.argoproj.io" @@ -23,4 +25,6 @@ var ( // LabelKeySecretType contains the type of argocd secret (either 'cluster' or 'repo') LabelKeySecretType = MetadataPrefix + "/secret-type" + // LabelKeyApplicationControllerInstanceID is the label which allows to separate application among multiple running application controllers. + LabelKeyApplicationControllerInstanceID = application.ApplicationFullName + "/controller-instanceid" ) diff --git a/controller/controller.go b/controller/controller.go index ac95fa0c11406..d18f568ad15e4 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -3,6 +3,7 @@ package controller import ( "context" + "github.com/argoproj/argo-cd/common" appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned" appinformers "github.com/argoproj/argo-cd/pkg/client/informers/externalversions" @@ -11,6 +12,10 @@ import ( "time" "github.com/argoproj/argo-cd/application" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -26,6 +31,12 @@ type ApplicationController struct { applicationClientset appclientset.Interface appQueue workqueue.RateLimitingInterface appInformer cache.SharedIndexInformer + config *ApplicationControllerConfig +} + +type ApplicationControllerConfig struct { + InstanceID string + Namespace string } // NewApplicationController creates new instance of ApplicationController. @@ -33,14 +44,15 @@ func NewApplicationController( kubeClientset kubernetes.Interface, applicationClientset appclientset.Interface, appManager *application.Manager, - appResyncPeriod time.Duration) *ApplicationController { + appResyncPeriod time.Duration, + config *ApplicationControllerConfig) *ApplicationController { appQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) return &ApplicationController{ appManager: appManager, kubeClientset: kubeClientset, applicationClientset: applicationClientset, appQueue: appQueue, - appInformer: newApplicationInformer(applicationClientset, appQueue, appResyncPeriod), + appInformer: newApplicationInformer(applicationClientset, appQueue, appResyncPeriod, config), } } @@ -109,10 +121,29 @@ func (ctrl *ApplicationController) persistApp(app *appv1.Application) { log.Info("Application update successful") } -func newApplicationInformer(appClientset appclientset.Interface, appQueue workqueue.RateLimitingInterface, appResyncPeriod time.Duration) cache.SharedIndexInformer { - appInformerFactory := appinformers.NewSharedInformerFactory( +func newApplicationInformer( + appClientset appclientset.Interface, appQueue workqueue.RateLimitingInterface, appResyncPeriod time.Duration, config *ApplicationControllerConfig) cache.SharedIndexInformer { + + appInformerFactory := appinformers.NewFilteredSharedInformerFactory( appClientset, appResyncPeriod, + config.Namespace, + func(options *metav1.ListOptions) { + var instanceIDReq *labels.Requirement + var err error + if config.InstanceID != "" { + instanceIDReq, err = labels.NewRequirement(common.LabelKeyApplicationControllerInstanceID, selection.Equals, []string{config.InstanceID}) + } else { + instanceIDReq, err = labels.NewRequirement(common.LabelKeyApplicationControllerInstanceID, selection.DoesNotExist, nil) + } + if err != nil { + panic(err) + } + + options.FieldSelector = fields.Everything().String() + labelSelector := labels.NewSelector().Add(*instanceIDReq) + options.LabelSelector = labelSelector.String() + }, ) informer := appInformerFactory.Argoproj().V1alpha1().Applications().Informer() informer.AddEventHandler(