Skip to content

Commit

Permalink
feat(multi-cluster k8s dashboard): Make necessary changes for multi-c…
Browse files Browse the repository at this point in the history
…luster k8s dashboard feature in AKP (argoproj#48)

* feat: send data to agent

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* (wip)feat: send data to syncer APIs

* add support for columns info

* fix: resource deletion missing events

* feat: use resource sync and health info

* chore: disable dependabot

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* feat: user server side filtering & pagination to load Argo CD applications list

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* fix: fix cluster filtering and refresh button on application list page

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* fix: api server fails with "nil pointer dereference" error (argoproj#40)

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* fix(ui): ui should show the same page number as in API (argoproj#38)

* feat: repository/Service.ListRefs returns raw list of refs (argoproj#42)

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* fix: avoid closing search dropdown while typing; fix cluster filter (argoproj#43)

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* fix(ui): Reoreding filters (argoproj#44)

* fix(ui): Reoreding filters

* fix

* fix: -ak.19 server version causing argocd app wait command to hang (argoproj#45)

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* feat: enable dashboard feature by using env var

* fix: 1.21.0 -> 1.21

* chore: disable dependabot

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* feat: user server side filtering & pagination to load Argo CD applications list

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* fix: fix cluster filtering and refresh button on application list page

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* fix: api server fails with "nil pointer dereference" error (argoproj#40)

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* fix(ui): ui should show the same page number as in API (argoproj#38)

* feat: repository/Service.ListRefs returns raw list of refs (argoproj#42)

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* fix: avoid closing search dropdown while typing; fix cluster filter (argoproj#43)

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

* fix(ui): Reoreding filters (argoproj#44)

* fix(ui): Reoreding filters

* fix

* fix: -ak.19 server version causing argocd app wait command to hang (argoproj#45)

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>

---------

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>
Co-authored-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>
Co-authored-by: Mayursinh Sarvaiya <marvinduff97@gmail.com>
Co-authored-by: Rafal <rafpelczar@gmail.com>
  • Loading branch information
4 people authored and Akuity committed Nov 21, 2024
1 parent 57c81b5 commit 18119f1
Show file tree
Hide file tree
Showing 5 changed files with 374 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ manifests: test-tools-image
# consolidated binary for cli, util, server, repo-server, controller
.PHONY: argocd-all
argocd-all: clean-debug
CGO_ENABLED=${CGO_FLAG} GOOS=${GOOS} GOARCH=${GOARCH} GODEBUG="tarinsecurepath=0,zipinsecurepath=0" go build -v -ldflags '${LDFLAGS}' -o ${DIST_DIR}/${BIN_NAME} ./cmd
CGO_ENABLED=${CGO_FLAG} GOOS=${GOOS} GOARCH=${GOARCH} GODEBUG="tarinsecurepath=0,zipinsecurepath=0" go build -v -ldflags '${LDFLAGS}' -tags akengine -o ${DIST_DIR}/${BIN_NAME} ./cmd

.PHONY: server
server: clean-debug
Expand Down
15 changes: 15 additions & 0 deletions controller/cache/ak-dashboard/ak-engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//go:build akengine

package ak_dashboard

import (
"os"

clustercache "github.com/argoproj/gitops-engine/pkg/cache"
)

func init() {
clusterCacheSetReplaceGKHandler = clustercache.SetReplaceGKHandler

dashboardEnabled = os.Getenv("AK_DASHBOARD_ENABLED") == "true"
}
205 changes: 205 additions & 0 deletions controller/cache/ak-dashboard/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package ak_dashboard

import (
"bytes"
"context"
"net/http"
"sync"
"time"

"github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
clustercache "github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/health"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
log "github.com/sirupsen/logrus"
apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

var (
clusterCacheSetReplaceGKHandler func(handler func(gk schema.GroupKind) func()) clustercache.UpdateSettingsFunc
dashboardEnabled bool

collectClusterInfoInterval = 10 * time.Minute

httpClientTimeout = 30 * time.Second

clusterInfoURL = "http://localhost:8002/k8s-info"
resourceURL = "http://localhost:8002/k8s-resources"
)

type Processor interface {
OnResourceUpdated(res *unstructured.Unstructured, appName string, override health.HealthOverride)
OnResourceDeleted(res *clustercache.Resource, override health.HealthOverride)
GetCacheSettings(override health.HealthOverride) []clustercache.UpdateSettingsFunc
StartInfoCollector(cache clustercache.ClusterCache)
}

type gkEvents struct {
replaceGk *schema.GroupKind
deleted []*clustercache.Resource
updated []struct {
appName string
res *unstructured.Unstructured
}
}

var stopPrevCollector func()

type akProcessor struct {
replacingGk sync.Map
appListener v1alpha1.ApplicationLister
extensionsClient *apiextensions.ApiextensionsV1Client
appsNs string
initAppsNs sync.Mutex
client *http.Client
}

func (p *akProcessor) getAppsNsLister() v1alpha1.ApplicationNamespaceLister {
if p.appsNs == "" {
p.initAppsNs.Lock()
if apps, err := p.appListener.List(labels.Everything()); err == nil && len(apps) > 0 {
p.appsNs = apps[0].Namespace
}
p.initAppsNs.Unlock()
}
return p.appListener.Applications(p.appsNs)
}

func (p *akProcessor) sendEvents(events *gkEvents, override health.HealthOverride) {
resources := NewResourceEvent(*events, p.getAppsNsLister(), override)
jsonData, err := resources.Marshal()
if err != nil {
log.Errorf("failed to marshal resource events: %v", err)
}
log.Debug("sending resource events", string(jsonData))
resp, err := http.Post(resourceURL, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
log.Errorf("failed to send resource events: %v", err)
return
}
defer resp.Body.Close()
}

func (p *akProcessor) sendClusterInfo(info clustercache.ClusterInfo) {
crds, err := p.extensionsClient.CustomResourceDefinitions().List(context.Background(), v1.ListOptions{})
if err != nil {
log.Errorf("failed to list CRDs: %v", err)
}
clusterInfo := NewClusterInfo(info, crds)
jsonData, err := clusterInfo.Marshal()
log.Debug("sending resource events", string(jsonData))
if err != nil {
log.Errorf("failed to marshal cluster info: %v", err)
return
}
resp, err := http.Post(clusterInfoURL, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
log.Errorf("failed to send cluster info: %v", err)
return
}
defer resp.Body.Close()
}

func (p *akProcessor) StartInfoCollector(cache clustercache.ClusterCache) {
if stopPrevCollector != nil {
stopPrevCollector()
}
ctx, cancel := context.WithCancel(context.Background())
stopPrevCollector = cancel

ticker := time.NewTicker(collectClusterInfoInterval)
defer ticker.Stop()
go func() {
if err := cache.EnsureSynced(); err == nil {
p.sendClusterInfo(cache.GetClusterInfo())
}
for {
select {
case <-ticker.C:
p.sendClusterInfo(cache.GetClusterInfo())
case <-ctx.Done():
}
}
}()
}

func (p *akProcessor) OnResourceDeleted(res *clustercache.Resource, override health.HealthOverride) {
if val, isReplacing := p.replacingGk.Load(res.ResourceKey().GroupKind()); isReplacing {
events := val.(*gkEvents)
events.deleted = append(events.deleted, res)
} else {
p.sendEvents(&gkEvents{deleted: []*clustercache.Resource{res}}, override)
}
}

func (p *akProcessor) OnResourceUpdated(res *unstructured.Unstructured, appName string, override health.HealthOverride) {
item := struct {
appName string
res *unstructured.Unstructured
}{
appName: appName,
res: res,
}
if val, isReplacing := p.replacingGk.Load(kube.GetResourceKey(res).GroupKind()); isReplacing {
events := val.(*gkEvents)
events.updated = append(events.updated, item)
} else {
p.sendEvents(&gkEvents{updated: []struct {
appName string
res *unstructured.Unstructured
}{item}}, override)
}
}

func (p *akProcessor) GetCacheSettings(override health.HealthOverride) []clustercache.UpdateSettingsFunc {
return []clustercache.UpdateSettingsFunc{clusterCacheSetReplaceGKHandler(func(gk schema.GroupKind) func() {
p.replacingGk.Store(gk, &gkEvents{replaceGk: &gk})
return func() {
if val, loaded := p.replacingGk.LoadAndDelete(gk); loaded {
p.sendEvents(val.(*gkEvents), override)
}
}
})}
}

func NewProcessor(appInformer cache.SharedIndexInformer, config *rest.Config) (Processor, error) {
log.Infof("AK Dashboard enabled: %v", dashboardEnabled)
if !dashboardEnabled {
return &noopProcessor{}, nil
}

extensionsClient, err := apiextensions.NewForConfig(config)
if err != nil {
return nil, err
}

return &akProcessor{
client: &http.Client{
Timeout: httpClientTimeout,
},
appListener: v1alpha1.NewApplicationLister(appInformer.GetIndexer()),
extensionsClient: extensionsClient,
}, nil
}

type noopProcessor struct {
}

func (n noopProcessor) StartInfoCollector(cache clustercache.ClusterCache) {
}

func (n noopProcessor) OnResourceUpdated(res *unstructured.Unstructured, appName string, override health.HealthOverride) {
}

func (n noopProcessor) OnResourceDeleted(res *clustercache.Resource, override health.HealthOverride) {
}

func (n noopProcessor) GetCacheSettings(override health.HealthOverride) []clustercache.UpdateSettingsFunc {
return nil
}
141 changes: 141 additions & 0 deletions controller/cache/ak-dashboard/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package ak_dashboard

import (
"encoding/json"

"github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
clustercache "github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/health"
extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type ClusterInfo struct {
K8sVersion string `json:"k8sVersion,omitempty"`
APIResources []APIResource `json:"apiResources,omitempty"`
}

type APIResource struct {
Group string `json:"group,omitempty"`
Version string `json:"version,omitempty"`
Kind string `json:"kind,omitempty"`
Columns []Column `json:"columns,omitempty"`
}

type Column struct {
Name string `json:"name,omitempty"`
JSONPath string `json:"jsonPath,omitempty"`
}

func NewClusterInfo(info clustercache.ClusterInfo, crds *extensionsv1.CustomResourceDefinitionList) ClusterInfo {
columnsByGVK := map[v1.GroupVersionKind][]Column{}
for _, crd := range crds.Items {
for _, v := range crd.Spec.Versions {
columns := make([]Column, 0, len(v.AdditionalPrinterColumns))
for _, col := range v.AdditionalPrinterColumns {
columns = append(columns, Column{
Name: col.Name,
JSONPath: col.JSONPath,
})
}
columnsByGVK[v1.GroupVersionKind{Group: crd.Spec.Group, Kind: crd.Spec.Names.Kind, Version: v.Name}] = columns
}
}

apiResources := make([]APIResource, 0, len(info.APIResources))
for _, resource := range info.APIResources {
apiResources = append(apiResources, APIResource{
Group: resource.GroupKind.Group,
Version: resource.GroupVersionResource.Version,
Kind: resource.GroupKind.Kind,
Columns: columnsByGVK[v1.GroupVersionKind{Group: resource.GroupKind.Group, Kind: resource.GroupKind.Kind, Version: resource.GroupVersionResource.Version}],
})
}

return ClusterInfo{
K8sVersion: info.K8SVersion,
APIResources: apiResources,
}
}

func (c *ClusterInfo) Marshal() ([]byte, error) {
return json.Marshal(c)
}

type ResourceEvents struct {
ReplaceGK *schema.GroupKind `json:"replaceGk,omitempty"`
DeletedResources []unstructured.Unstructured `json:"deletedResources,omitempty"`
UpdatedResources []UpdatedResource `json:"updatedResources,omitempty"`
}

type UpdatedResource struct {
ApplicationInfo ApplicationInfo `json:"applicationInfo,omitempty"`
Resource unstructured.Unstructured `json:"resource,omitempty"`
}

type ApplicationInfo struct {
Name string `json:"name,omitempty"`
SyncStatus string `json:"syncStatus,omitempty"`
HealthStatus string `json:"healthStatus,omitempty"`
}

func NewResourceEvent(events gkEvents, appLister v1alpha1.ApplicationNamespaceLister, override health.HealthOverride) ResourceEvents {
resourceEvents := ResourceEvents{
ReplaceGK: events.replaceGk,
}
deletedObjs := make([]unstructured.Unstructured, 0, len(events.deleted))
for _, res := range events.deleted {
ref := res.Ref
obj := unstructured.Unstructured{}
obj.SetUID(ref.UID)
obj.SetName(ref.Name)
obj.SetNamespace(ref.Namespace)
obj.SetAPIVersion(ref.APIVersion)
obj.SetKind(ref.Kind)
deletedObjs = append(deletedObjs, obj)
}
resourceEvents.DeletedResources = deletedObjs
updatedObjs := make([]UpdatedResource, 0, len(events.updated))
for _, event := range events.updated {
if event.res == nil {
continue
}
applicationInfo := ApplicationInfo{
Name: event.appName,
}
if event.appName != "" {
if app, err := appLister.Get(event.appName); err == nil {
for _, res := range app.Status.Resources {
if res.Group == event.res.GroupVersionKind().Group &&
res.Kind == event.res.GetKind() &&
res.Version == event.res.GroupVersionKind().Version &&
res.Name == event.res.GetName() &&
res.Namespace == event.res.GetNamespace() {
applicationInfo.SyncStatus = string(res.Status)
if res.Health != nil {
applicationInfo.HealthStatus = string(res.Health.Status)
} else {
healthStatus, _ := health.GetResourceHealth(event.res, override)
if healthStatus != nil {
applicationInfo.HealthStatus = string(healthStatus.Status)
}
}
break
}
}
}
}
updatedObjs = append(updatedObjs, UpdatedResource{
ApplicationInfo: applicationInfo,
Resource: *event.res,
})
}
resourceEvents.UpdatedResources = updatedObjs
return resourceEvents
}

func (r *ResourceEvents) Marshal() ([]byte, error) {
return json.Marshal(r)
}
Loading

0 comments on commit 18119f1

Please sign in to comment.