diff --git a/Makefile b/Makefile index c1ef27163cc60..522204f918d34 100644 --- a/Makefile +++ b/Makefile @@ -288,7 +288,7 @@ manifests: test-tools-image # consolidated binary for cli, util, server, repo-server, controller .PHONY: argocd-all argocd-all: clean-debug - CGO_ENABLED=${CGO_FLAG} GOOS=${GOOS} GOARCH=${GOARCH} GODEBUG="tarinsecurepath=0,zipinsecurepath=0" go build -v -ldflags '${LDFLAGS}' -o ${DIST_DIR}/${BIN_NAME} ./cmd + CGO_ENABLED=${CGO_FLAG} GOOS=${GOOS} GOARCH=${GOARCH} GODEBUG="tarinsecurepath=0,zipinsecurepath=0" go build -v -ldflags '${LDFLAGS}' -tags akengine -o ${DIST_DIR}/${BIN_NAME} ./cmd .PHONY: server server: clean-debug diff --git a/controller/cache/ak-dashboard/ak-engine.go b/controller/cache/ak-dashboard/ak-engine.go new file mode 100644 index 0000000000000..10748e165dd97 --- /dev/null +++ b/controller/cache/ak-dashboard/ak-engine.go @@ -0,0 +1,15 @@ +//go:build akengine + +package ak_dashboard + +import ( + "os" + + clustercache "github.com/argoproj/gitops-engine/pkg/cache" +) + +func init() { + clusterCacheSetReplaceGKHandler = clustercache.SetReplaceGKHandler + + dashboardEnabled = os.Getenv("AK_DASHBOARD_ENABLED") == "true" +} diff --git a/controller/cache/ak-dashboard/processor.go b/controller/cache/ak-dashboard/processor.go new file mode 100644 index 0000000000000..7ecb5a37bebfe --- /dev/null +++ b/controller/cache/ak-dashboard/processor.go @@ -0,0 +1,205 @@ +package ak_dashboard + +import ( + "bytes" + "context" + "net/http" + "sync" + "time" + + "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + clustercache "github.com/argoproj/gitops-engine/pkg/cache" + "github.com/argoproj/gitops-engine/pkg/health" + "github.com/argoproj/gitops-engine/pkg/utils/kube" + log "github.com/sirupsen/logrus" + apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" +) + +var ( + clusterCacheSetReplaceGKHandler func(handler func(gk schema.GroupKind) func()) clustercache.UpdateSettingsFunc + dashboardEnabled bool + + collectClusterInfoInterval = 10 * time.Minute + + httpClientTimeout = 30 * time.Second + + clusterInfoURL = "http://localhost:8002/k8s-info" + resourceURL = "http://localhost:8002/k8s-resources" +) + +type Processor interface { + OnResourceUpdated(res *unstructured.Unstructured, appName string, override health.HealthOverride) + OnResourceDeleted(res *clustercache.Resource, override health.HealthOverride) + GetCacheSettings(override health.HealthOverride) []clustercache.UpdateSettingsFunc + StartInfoCollector(cache clustercache.ClusterCache) +} + +type gkEvents struct { + replaceGk *schema.GroupKind + deleted []*clustercache.Resource + updated []struct { + appName string + res *unstructured.Unstructured + } +} + +var stopPrevCollector func() + +type akProcessor struct { + replacingGk sync.Map + appListener v1alpha1.ApplicationLister + extensionsClient *apiextensions.ApiextensionsV1Client + appsNs string + initAppsNs sync.Mutex + client *http.Client +} + +func (p *akProcessor) getAppsNsLister() v1alpha1.ApplicationNamespaceLister { + if p.appsNs == "" { + p.initAppsNs.Lock() + if apps, err := p.appListener.List(labels.Everything()); err == nil && len(apps) > 0 { + p.appsNs = apps[0].Namespace + } + p.initAppsNs.Unlock() + } + return p.appListener.Applications(p.appsNs) +} + +func (p *akProcessor) sendEvents(events *gkEvents, override health.HealthOverride) { + resources := NewResourceEvent(*events, p.getAppsNsLister(), override) + jsonData, err := resources.Marshal() + if err != nil { + log.Errorf("failed to marshal resource events: %v", err) + } + log.Debug("sending resource events", string(jsonData)) + resp, err := http.Post(resourceURL, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + log.Errorf("failed to send resource events: %v", err) + return + } + defer resp.Body.Close() +} + +func (p *akProcessor) sendClusterInfo(info clustercache.ClusterInfo) { + crds, err := p.extensionsClient.CustomResourceDefinitions().List(context.Background(), v1.ListOptions{}) + if err != nil { + log.Errorf("failed to list CRDs: %v", err) + } + clusterInfo := NewClusterInfo(info, crds) + jsonData, err := clusterInfo.Marshal() + log.Debug("sending resource events", string(jsonData)) + if err != nil { + log.Errorf("failed to marshal cluster info: %v", err) + return + } + resp, err := http.Post(clusterInfoURL, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + log.Errorf("failed to send cluster info: %v", err) + return + } + defer resp.Body.Close() +} + +func (p *akProcessor) StartInfoCollector(cache clustercache.ClusterCache) { + if stopPrevCollector != nil { + stopPrevCollector() + } + ctx, cancel := context.WithCancel(context.Background()) + stopPrevCollector = cancel + + ticker := time.NewTicker(collectClusterInfoInterval) + defer ticker.Stop() + go func() { + if err := cache.EnsureSynced(); err == nil { + p.sendClusterInfo(cache.GetClusterInfo()) + } + for { + select { + case <-ticker.C: + p.sendClusterInfo(cache.GetClusterInfo()) + case <-ctx.Done(): + } + } + }() +} + +func (p *akProcessor) OnResourceDeleted(res *clustercache.Resource, override health.HealthOverride) { + if val, isReplacing := p.replacingGk.Load(res.ResourceKey().GroupKind()); isReplacing { + events := val.(*gkEvents) + events.deleted = append(events.deleted, res) + } else { + p.sendEvents(&gkEvents{deleted: []*clustercache.Resource{res}}, override) + } +} + +func (p *akProcessor) OnResourceUpdated(res *unstructured.Unstructured, appName string, override health.HealthOverride) { + item := struct { + appName string + res *unstructured.Unstructured + }{ + appName: appName, + res: res, + } + if val, isReplacing := p.replacingGk.Load(kube.GetResourceKey(res).GroupKind()); isReplacing { + events := val.(*gkEvents) + events.updated = append(events.updated, item) + } else { + p.sendEvents(&gkEvents{updated: []struct { + appName string + res *unstructured.Unstructured + }{item}}, override) + } +} + +func (p *akProcessor) GetCacheSettings(override health.HealthOverride) []clustercache.UpdateSettingsFunc { + return []clustercache.UpdateSettingsFunc{clusterCacheSetReplaceGKHandler(func(gk schema.GroupKind) func() { + p.replacingGk.Store(gk, &gkEvents{replaceGk: &gk}) + return func() { + if val, loaded := p.replacingGk.LoadAndDelete(gk); loaded { + p.sendEvents(val.(*gkEvents), override) + } + } + })} +} + +func NewProcessor(appInformer cache.SharedIndexInformer, config *rest.Config) (Processor, error) { + log.Infof("AK Dashboard enabled: %v", dashboardEnabled) + if !dashboardEnabled { + return &noopProcessor{}, nil + } + + extensionsClient, err := apiextensions.NewForConfig(config) + if err != nil { + return nil, err + } + + return &akProcessor{ + client: &http.Client{ + Timeout: httpClientTimeout, + }, + appListener: v1alpha1.NewApplicationLister(appInformer.GetIndexer()), + extensionsClient: extensionsClient, + }, nil +} + +type noopProcessor struct { +} + +func (n noopProcessor) StartInfoCollector(cache clustercache.ClusterCache) { +} + +func (n noopProcessor) OnResourceUpdated(res *unstructured.Unstructured, appName string, override health.HealthOverride) { +} + +func (n noopProcessor) OnResourceDeleted(res *clustercache.Resource, override health.HealthOverride) { +} + +func (n noopProcessor) GetCacheSettings(override health.HealthOverride) []clustercache.UpdateSettingsFunc { + return nil +} diff --git a/controller/cache/ak-dashboard/types.go b/controller/cache/ak-dashboard/types.go new file mode 100644 index 0000000000000..ea8625a646b16 --- /dev/null +++ b/controller/cache/ak-dashboard/types.go @@ -0,0 +1,141 @@ +package ak_dashboard + +import ( + "encoding/json" + + "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + clustercache "github.com/argoproj/gitops-engine/pkg/cache" + "github.com/argoproj/gitops-engine/pkg/health" + extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type ClusterInfo struct { + K8sVersion string `json:"k8sVersion,omitempty"` + APIResources []APIResource `json:"apiResources,omitempty"` +} + +type APIResource struct { + Group string `json:"group,omitempty"` + Version string `json:"version,omitempty"` + Kind string `json:"kind,omitempty"` + Columns []Column `json:"columns,omitempty"` +} + +type Column struct { + Name string `json:"name,omitempty"` + JSONPath string `json:"jsonPath,omitempty"` +} + +func NewClusterInfo(info clustercache.ClusterInfo, crds *extensionsv1.CustomResourceDefinitionList) ClusterInfo { + columnsByGVK := map[v1.GroupVersionKind][]Column{} + for _, crd := range crds.Items { + for _, v := range crd.Spec.Versions { + columns := make([]Column, 0, len(v.AdditionalPrinterColumns)) + for _, col := range v.AdditionalPrinterColumns { + columns = append(columns, Column{ + Name: col.Name, + JSONPath: col.JSONPath, + }) + } + columnsByGVK[v1.GroupVersionKind{Group: crd.Spec.Group, Kind: crd.Spec.Names.Kind, Version: v.Name}] = columns + } + } + + apiResources := make([]APIResource, 0, len(info.APIResources)) + for _, resource := range info.APIResources { + apiResources = append(apiResources, APIResource{ + Group: resource.GroupKind.Group, + Version: resource.GroupVersionResource.Version, + Kind: resource.GroupKind.Kind, + Columns: columnsByGVK[v1.GroupVersionKind{Group: resource.GroupKind.Group, Kind: resource.GroupKind.Kind, Version: resource.GroupVersionResource.Version}], + }) + } + + return ClusterInfo{ + K8sVersion: info.K8SVersion, + APIResources: apiResources, + } +} + +func (c *ClusterInfo) Marshal() ([]byte, error) { + return json.Marshal(c) +} + +type ResourceEvents struct { + ReplaceGK *schema.GroupKind `json:"replaceGk,omitempty"` + DeletedResources []unstructured.Unstructured `json:"deletedResources,omitempty"` + UpdatedResources []UpdatedResource `json:"updatedResources,omitempty"` +} + +type UpdatedResource struct { + ApplicationInfo ApplicationInfo `json:"applicationInfo,omitempty"` + Resource unstructured.Unstructured `json:"resource,omitempty"` +} + +type ApplicationInfo struct { + Name string `json:"name,omitempty"` + SyncStatus string `json:"syncStatus,omitempty"` + HealthStatus string `json:"healthStatus,omitempty"` +} + +func NewResourceEvent(events gkEvents, appLister v1alpha1.ApplicationNamespaceLister, override health.HealthOverride) ResourceEvents { + resourceEvents := ResourceEvents{ + ReplaceGK: events.replaceGk, + } + deletedObjs := make([]unstructured.Unstructured, 0, len(events.deleted)) + for _, res := range events.deleted { + ref := res.Ref + obj := unstructured.Unstructured{} + obj.SetUID(ref.UID) + obj.SetName(ref.Name) + obj.SetNamespace(ref.Namespace) + obj.SetAPIVersion(ref.APIVersion) + obj.SetKind(ref.Kind) + deletedObjs = append(deletedObjs, obj) + } + resourceEvents.DeletedResources = deletedObjs + updatedObjs := make([]UpdatedResource, 0, len(events.updated)) + for _, event := range events.updated { + if event.res == nil { + continue + } + applicationInfo := ApplicationInfo{ + Name: event.appName, + } + if event.appName != "" { + if app, err := appLister.Get(event.appName); err == nil { + for _, res := range app.Status.Resources { + if res.Group == event.res.GroupVersionKind().Group && + res.Kind == event.res.GetKind() && + res.Version == event.res.GroupVersionKind().Version && + res.Name == event.res.GetName() && + res.Namespace == event.res.GetNamespace() { + applicationInfo.SyncStatus = string(res.Status) + if res.Health != nil { + applicationInfo.HealthStatus = string(res.Health.Status) + } else { + healthStatus, _ := health.GetResourceHealth(event.res, override) + if healthStatus != nil { + applicationInfo.HealthStatus = string(healthStatus.Status) + } + } + break + } + } + } + } + updatedObjs = append(updatedObjs, UpdatedResource{ + ApplicationInfo: applicationInfo, + Resource: *event.res, + }) + } + resourceEvents.UpdatedResources = updatedObjs + return resourceEvents +} + +func (r *ResourceEvents) Marshal() ([]byte, error) { + return json.Marshal(r) +} diff --git a/controller/cache/cache.go b/controller/cache/cache.go index b17afbca5234b..b3e858b2f08e6 100644 --- a/controller/cache/cache.go +++ b/controller/cache/cache.go @@ -15,6 +15,7 @@ import ( "syscall" "time" + "github.com/argoproj/argo-cd/v2/controller/cache/ak-dashboard" clustercache "github.com/argoproj/gitops-engine/pkg/cache" "github.com/argoproj/gitops-engine/pkg/health" "github.com/argoproj/gitops-engine/pkg/utils/kube" @@ -511,6 +512,10 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e if log.GetLevel() < log.DebugLevel { clusterCacheConfig.WarningHandler = rest.NoWarnings{} } + dashboardProcessor, err := ak_dashboard.NewProcessor(c.appInformer, clusterCacheConfig) + if err != nil { + return nil, fmt.Errorf("error creating dashboard processor: %w", err) + } clusterCacheOpts := []clustercache.UpdateSettingsFunc{ clustercache.SetListSemaphore(semaphore.NewWeighted(clusterCacheListSemaphoreSize)), @@ -546,6 +551,7 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e res.manifestHash = hash } } + dashboardProcessor.OnResourceUpdated(un, appName, cacheSettings.clusterSettings.ResourceHealthOverride) // edge case. we do not label CRDs, so they miss the tracking label we inject. But we still // want the full resource to be available in our cache (to diff), so we store all CRDs @@ -556,7 +562,10 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e clustercache.SetRespectRBAC(respectRBAC), } + clusterCacheOpts = append(clusterCacheOpts, dashboardProcessor.GetCacheSettings(cacheSettings.clusterSettings.ResourceHealthOverride)...) + clusterCache = clustercache.NewClusterCache(clusterCacheConfig, clusterCacheOpts...) + dashboardProcessor.StartInfoCollector(clusterCache) _ = clusterCache.OnResourceUpdated(func(newRes *clustercache.Resource, oldRes *clustercache.Resource, namespaceResources map[kube.ResourceKey]*clustercache.Resource) { toNotify := make(map[string]bool) @@ -601,6 +610,9 @@ func (c *liveStateCache) getCluster(server string) (clustercache.ClusterCache, e toNotify[app] = isRootAppNode(r) || toNotify[app] } c.onObjectUpdated(toNotify, ref) + if newRes == nil { + dashboardProcessor.OnResourceDeleted(oldRes, cacheSettings.clusterSettings.ResourceHealthOverride) + } }) _ = clusterCache.OnEvent(func(event watch.EventType, un *unstructured.Unstructured) {