Skip to content

Commit

Permalink
Issue #69 - Auto-sync option in application CRD instance (#83)
Browse files Browse the repository at this point in the history
* Issue #69 - Auto-sync option in application CRD instance

* Fix possible NPE error in controller

* Correctly handle situation when cluster is temporary down/unreachable by argocd

* Handle case when argo monitor cluster which go offline and recover
  • Loading branch information
alexmt authored Apr 11, 2018
1 parent 8a90b32 commit 06b6404
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 30 deletions.
3 changes: 3 additions & 0 deletions cmd/argocd-application-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,17 @@ func newCommand() *cobra.Command {
}
resyncDuration := time.Duration(appResyncPeriod) * time.Second
apiRepoServer := apirepository.NewServer(namespace, kubeClient, appClient)
apiClusterServer := cluster.NewServer(namespace, kubeClient, appClient)
clusterService := cluster.NewServer(namespace, kubeClient, appClient)
appComparator := controller.NewKsonnetAppComparator(clusterService)

appController := controller.NewApplicationController(
namespace,
kubeClient,
appClient,
reposerver.NewRepositoryServerClientset(repoServerAddress),
apiRepoServer,
apiClusterServer,
appComparator,
resyncDuration,
&controllerConfig)
Expand Down
176 changes: 148 additions & 28 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ import (
"fmt"
"time"

"sync"

"github.com/argoproj/argo-cd/common"
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
appinformers "github.com/argoproj/argo-cd/pkg/client/informers/externalversions"
"github.com/argoproj/argo-cd/reposerver"
"github.com/argoproj/argo-cd/reposerver/repository"
"github.com/argoproj/argo-cd/server/cluster"
apireposerver "github.com/argoproj/argo-cd/server/repository"
"github.com/argoproj/argo-cd/util"
argoutil "github.com/argoproj/argo-cd/util/argo"
"github.com/argoproj/argo-cd/util/kube"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -23,21 +27,30 @@ import (
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

const (
watchResourcesRetryTimeout = 10 * time.Second
)

// ApplicationController is the controller for application resources.
type ApplicationController struct {
repoClientset reposerver.Clientset
kubeClientset kubernetes.Interface
applicationClientset appclientset.Interface
appQueue workqueue.RateLimitingInterface
appInformer cache.SharedIndexInformer
appComparator AppComparator
statusRefreshTimeout time.Duration
apiRepoService apireposerver.RepositoryServiceServer
namespace string
repoClientset reposerver.Clientset
kubeClientset kubernetes.Interface
applicationClientset appclientset.Interface
appQueue workqueue.RateLimitingInterface
appInformer cache.SharedIndexInformer
appComparator AppComparator
statusRefreshTimeout time.Duration
apiRepoService apireposerver.RepositoryServiceServer
apiClusterService *cluster.Server
forceRefreshApps map[string]bool
forceRefreshAppsMutex *sync.Mutex
}

type ApplicationControllerConfig struct {
Expand All @@ -47,24 +60,30 @@ type ApplicationControllerConfig struct {

// NewApplicationController creates new instance of ApplicationController.
func NewApplicationController(
namespace string,
kubeClientset kubernetes.Interface,
applicationClientset appclientset.Interface,
repoClientset reposerver.Clientset,
apiRepoService apireposerver.RepositoryServiceServer,
apiClusterService *cluster.Server,
appComparator AppComparator,
appResyncPeriod time.Duration,
config *ApplicationControllerConfig,
) *ApplicationController {
appQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
return &ApplicationController{
kubeClientset: kubeClientset,
applicationClientset: applicationClientset,
repoClientset: repoClientset,
appQueue: appQueue,
apiRepoService: apiRepoService,
appComparator: appComparator,
appInformer: newApplicationInformer(applicationClientset, appQueue, appResyncPeriod, config),
statusRefreshTimeout: appResyncPeriod,
namespace: namespace,
kubeClientset: kubeClientset,
applicationClientset: applicationClientset,
repoClientset: repoClientset,
appQueue: appQueue,
apiRepoService: apiRepoService,
apiClusterService: apiClusterService,
appComparator: appComparator,
appInformer: newApplicationInformer(applicationClientset, appQueue, appResyncPeriod, config),
statusRefreshTimeout: appResyncPeriod,
forceRefreshApps: make(map[string]bool),
forceRefreshAppsMutex: &sync.Mutex{},
}
}

Expand All @@ -74,6 +93,7 @@ func (ctrl *ApplicationController) Run(ctx context.Context, appWorkers int) {
defer ctrl.appQueue.ShutDown()

go ctrl.appInformer.Run(ctx.Done())
go ctrl.watchAppsResources()

if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced) {
log.Error("Timed out waiting for caches to sync")
Expand All @@ -87,6 +107,94 @@ func (ctrl *ApplicationController) Run(ctx context.Context, appWorkers int) {
<-ctx.Done()
}

func (ctrl *ApplicationController) forceAppRefresh(appName string) {
ctrl.forceRefreshAppsMutex.Lock()
defer ctrl.forceRefreshAppsMutex.Unlock()
ctrl.forceRefreshApps[appName] = true
}

func (ctrl *ApplicationController) isRefreshForced(appName string) bool {
ctrl.forceRefreshAppsMutex.Lock()
defer ctrl.forceRefreshAppsMutex.Unlock()
_, ok := ctrl.forceRefreshApps[appName]
if ok {
delete(ctrl.forceRefreshApps, appName)
}
return ok
}

// watchClusterResources watches for resource changes annotated with application label on specified cluster and schedule corresponding app refresh.
func (ctrl *ApplicationController) watchClusterResources(ctx context.Context, item appv1.Cluster) {
config := item.RESTConfig()
retryUntilSucceed(func() error {
ch, err := kube.WatchResourcesWithLabel(ctx, config, "", common.LabelApplicationName)
if err != nil {
return err
}
for event := range ch {
eventObj := event.Object.(*unstructured.Unstructured)
objLabels := eventObj.GetLabels()
if objLabels == nil {
objLabels = make(map[string]string)
}
if appName, ok := objLabels[common.LabelApplicationName]; ok {
ctrl.forceAppRefresh(appName)
ctrl.appQueue.Add(ctrl.namespace + "/" + appName)
}
}
return fmt.Errorf("resource updates channel has closed")
}, fmt.Sprintf("watch app resources on %s", config.Host), ctx, watchResourcesRetryTimeout)

}

// watchAppsResources watches for resource changes annotated with application label on all registered clusters and schedule corresponding app refresh.
func (ctrl *ApplicationController) watchAppsResources() {
watchingClusters := make(map[string]context.CancelFunc)

retryUntilSucceed(func() error {
return ctrl.apiClusterService.WatchClusters(context.Background(), func(event *cluster.ClusterEvent) {
cancel, ok := watchingClusters[event.Cluster.Server]
if event.Type == watch.Deleted && ok {
cancel()
delete(watchingClusters, event.Cluster.Server)
} else if event.Type != watch.Deleted && !ok {
ctx, cancel := context.WithCancel(context.Background())
watchingClusters[event.Cluster.Server] = cancel
go ctrl.watchClusterResources(ctx, *event.Cluster)
}
})
}, "watch clusters", context.Background(), watchResourcesRetryTimeout)

<-context.Background().Done()
}

// retryUntilSucceed keep retrying given action with specified timeout until action succeed or specified context is done.
func retryUntilSucceed(action func() error, desc string, ctx context.Context, timeout time.Duration) {
ctxCompleted := false
go func() {
select {
case <-ctx.Done():
ctxCompleted = true
}
}()
for {
err := action()
if err == nil {
return
}
if err != nil {
if ctxCompleted {
log.Infof("Stop retrying %s", desc)
return
} else {
log.Warnf("Failed to %s: %v, retrying in %v", desc, err, timeout)
time.Sleep(timeout)
}
}

}
}

func (ctrl *ApplicationController) processNextItem() bool {
appKey, shutdown := ctrl.appQueue.Get()
if shutdown {
Expand All @@ -110,20 +218,21 @@ func (ctrl *ApplicationController) processNextItem() bool {
return true
}

if app.NeedRefreshAppStatus(ctrl.statusRefreshTimeout) {
updatedApp := app.DeepCopy()
status, err := ctrl.tryRefreshAppStatus(updatedApp)
isForceRefreshed := ctrl.isRefreshForced(app.Name)
if isForceRefreshed || app.NeedRefreshAppStatus(ctrl.statusRefreshTimeout) {
log.Infof("Refreshing application '%s' status (force refreshed: %v)", app.Name, isForceRefreshed)

status, err := ctrl.tryRefreshAppStatus(app.DeepCopy())
if err != nil {
updatedApp.Status.ComparisonResult = appv1.ComparisonResult{
status = app.Status.DeepCopy()
status.ComparisonResult = appv1.ComparisonResult{
Status: appv1.ComparisonStatusError,
Error: fmt.Sprintf("Failed to get application status for application '%s': %v", app.Name, err),
ComparedTo: app.Spec.Source,
ComparedAt: metav1.Time{Time: time.Now().UTC()},
}
} else {
updatedApp.Status = *status
}
ctrl.persistApp(updatedApp)
ctrl.updateAppStatus(app.Name, app.Namespace, status)
}

return true
Expand Down Expand Up @@ -205,13 +314,24 @@ func (ctrl *ApplicationController) runWorker() {
}
}

func (ctrl *ApplicationController) persistApp(app *appv1.Application) {
appClient := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.ObjectMeta.Namespace)
_, err := appClient.Update(app)
func (ctrl *ApplicationController) updateAppStatus(appName string, namespace string, status *appv1.ApplicationStatus) {
appKey := fmt.Sprintf("%s/%s", namespace, appName)
obj, exists, err := ctrl.appInformer.GetIndexer().GetByKey(appKey)
if err != nil {
log.Warnf("Error updating application: %v", err)
log.Warnf("Failed to get application '%s' from informer index: %+v", appKey, err)
} else {
if exists {
app := obj.(*appv1.Application).DeepCopy()
app.Status = *status
appClient := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(namespace)
_, err := appClient.Update(app)
if err != nil {
log.Warnf("Error updating application: %v", err)
} else {
log.Info("Application update successful")
}
}
}
log.Info("Application update successful")
}

func newApplicationInformer(
Expand Down
38 changes: 37 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)

Expand All @@ -30,7 +31,7 @@ type Server struct {
}

// NewServer returns a new instance of the Cluster service
func NewServer(namespace string, kubeclientset kubernetes.Interface, appclientset appclientset.Interface) ClusterServiceServer {
func NewServer(namespace string, kubeclientset kubernetes.Interface, appclientset appclientset.Interface) *Server {
return &Server{
ns: namespace,
appclientset: appclientset,
Expand Down Expand Up @@ -93,6 +94,41 @@ func (s *Server) Create(ctx context.Context, c *appv1.Cluster) (*appv1.Cluster,
return secretToCluster(clusterSecret), nil
}

// ClusterEvent contains information about cluster event
type ClusterEvent struct {
Type watch.EventType
Cluster *appv1.Cluster
}

// WatchClusters allow watching for cluster events
func (s *Server) WatchClusters(ctx context.Context, callback func(*ClusterEvent)) error {
listOpts := metav1.ListOptions{}
labelSelector := labels.NewSelector()
req, err := labels.NewRequirement(common.LabelKeySecretType, selection.Equals, []string{common.SecretTypeCluster})
if err != nil {
return err
}
labelSelector = labelSelector.Add(*req)
listOpts.LabelSelector = labelSelector.String()
w, err := s.kubeclientset.CoreV1().Secrets(s.ns).Watch(listOpts)
if err != nil {
return err
}
go func() {
<-ctx.Done()
w.Stop()
}()
for next := range w.ResultChan() {
secret := next.Object.(*apiv1.Secret)
cluster := secretToCluster(secret)
callback(&ClusterEvent{
Type: next.Type,
Cluster: cluster,
})
}
return nil
}

func (s *Server) getClusterSecret(server string) (*apiv1.Secret, error) {
secName := serverToSecretName(server)
clusterSecret, err := s.kubeclientset.CoreV1().Secrets(s.ns).Get(secName, metav1.GetOptions{})
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ func (f *Fixture) CreateApp(t *testing.T, application *v1alpha1.Application) *v1
// CreateController creates new controller instance
func (f *Fixture) CreateController() *controller.ApplicationController {
return controller.NewApplicationController(
f.Namespace,
f.KubeClient,
f.AppClient,
reposerver.NewRepositoryServerClientset(f.repoServerListener.Addr().String()),
f.ApiRepoService,
cluster.NewServer(f.Namespace, f.KubeClient, f.AppClient),
f.AppComparator,
time.Second,
&controller.ApplicationControllerConfig{Namespace: f.Namespace, InstanceID: f.InstanceID})
Expand Down
Loading

0 comments on commit 06b6404

Please sign in to comment.