Skip to content

Commit

Permalink
WIP2
Browse files Browse the repository at this point in the history
  • Loading branch information
karlkfi committed Jul 13, 2024
1 parent 2fd62dd commit bb9361f
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 20 deletions.
10 changes: 6 additions & 4 deletions e2e/testcases/multi_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ func TestConflictingDefinitions_RootToRoot(t *testing.T) {
ru := roleUpdate{
ResourceVersion: o.GetResourceVersion(),
ApplySet: core.GetLabel(o, kubectlapply.ApplysetPartOfLabel),
Manager: core.GetAnnotation(o, metadata.ResourceManagerKey),
}
roleUpdates = append(roleUpdates, ru)
nt.T.Logf("Role update: %#v", ru)
Expand Down Expand Up @@ -673,15 +674,16 @@ func TestConflictingDefinitions_RootToRoot(t *testing.T) {
type roleUpdate struct {
ResourceVersion string
ApplySet string
Manager string
}

// reconcilerConflictObserved returns true if the role updates show more than one apply set
// reconcilerConflictObserved returns true if the role updates show more than one manager
func reconcilerConflictObserved(updates []roleUpdate) bool {
applySetIDs := map[string]struct{}{}
managers := map[string]struct{}{}
for _, update := range updates {
applySetIDs[update.ApplySet] = struct{}{}
managers[update.Manager] = struct{}{}
}
return len(applySetIDs) > 1
return len(managers) > 1
}

func TestConflictingDefinitions_NamespaceToNamespace(t *testing.T) {
Expand Down
29 changes: 25 additions & 4 deletions pkg/parse/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ func (u *Updater) update(ctx context.Context, cache *cacheForCommit) status.Mult
if err != nil {
return err
}

// Add new resources to the watch list, without removing old ones.
// This ensures controller conflicts are caught while the applier is running.
declaredGVKs, _ := u.Resources.DeclaredGVKs()
err = u.addWatches(ctx, declaredGVKs)
if err != nil {
return err
}

// Only mark the declared resources as updated if there were no (non-blocking) parse errors.
// This ensures the update will be retried until parsing fully succeeds.
if cache.parserErrs == nil {
Expand All @@ -143,7 +152,7 @@ func (u *Updater) update(ctx context.Context, cache *cacheForCommit) status.Mult
// Update the resource watches (triggers for the Remediator).
if !cache.watchesUpdated {
declaredGVKs, _ := u.Resources.DeclaredGVKs()
err := u.watch(ctx, declaredGVKs)
err := u.updateWatches(ctx, declaredGVKs)
if err != nil {
return err
}
Expand Down Expand Up @@ -213,9 +222,21 @@ func (u *Updater) apply(ctx context.Context, objs []client.Object, commit string
return nil
}

// watch updates the Remediator's watches to start new ones and stop old
// ones.
func (u *Updater) watch(ctx context.Context, gvks map[schema.GroupVersionKind]struct{}) status.MultiError {
// watch add new resources to the Remediator's watches
func (u *Updater) addWatches(ctx context.Context, gvks map[schema.GroupVersionKind]struct{}) status.MultiError {
klog.V(1).Info("Remediator watches addeding...")
watchErrs := u.Remediator.AddWatches(ctx, gvks)
u.SyncErrorCache.SetWatchErrs(watchErrs)
if watchErrs != nil {
klog.Warningf("Failed to add resource watches: %v", watchErrs)
return watchErrs
}
klog.V(3).Info("Remediator watches added")
return nil
}

// watch add new resources and removes old resources to the Remediator's watches
func (u *Updater) updateWatches(ctx context.Context, gvks map[schema.GroupVersionKind]struct{}) status.MultiError {
klog.V(1).Info("Remediator watches updating...")
watchErrs := u.Remediator.UpdateWatches(ctx, gvks)
u.SyncErrorCache.SetWatchErrs(watchErrs)
Expand Down
14 changes: 14 additions & 0 deletions pkg/remediator/fake/remediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
type Remediator struct {
ManagementConflictOutput bool
Watches map[schema.GroupVersionKind]struct{}
AddWatchesError status.MultiError
UpdateWatchesError status.MultiError
Watching bool
Paused bool
Expand Down Expand Up @@ -61,6 +62,19 @@ func (r *Remediator) Resume() {
r.Paused = false
}

// AddWatches fakes remediator.Remediator.AddWatches
func (r *Remediator) AddWatches(_ context.Context, watches map[schema.GroupVersionKind]struct{}) status.MultiError {
r.Watching = true
if r.Watches == nil {
r.Watches = watches
} else {
for gvk := range watches {
r.Watches[gvk] = struct{}{}
}
}
return r.AddWatchesError
}

// UpdateWatches fakes remediator.Remediator.UpdateWatches
func (r *Remediator) UpdateWatches(_ context.Context, watches map[schema.GroupVersionKind]struct{}) status.MultiError {
r.Watching = true
Expand Down
8 changes: 8 additions & 0 deletions pkg/remediator/remediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ type Interface interface {
// NeedsUpdate returns true if the Remediator needs its watches to be updated
// (typically due to some asynchronous error that occurred).
NeedsUpdate() bool
// AddWatches starts server-side watches based upon the given map of GVKs
// which should be watched.
AddWatches(context.Context, map[schema.GroupVersionKind]struct{}) status.MultiError
// UpdateWatches starts and stops server-side watches based upon the given map
// of GVKs which should be watched.
UpdateWatches(context.Context, map[schema.GroupVersionKind]struct{}) status.MultiError
Expand Down Expand Up @@ -224,6 +227,11 @@ func (r *Remediator) NeedsUpdate() bool {
return r.watchMgr.NeedsUpdate()
}

// AddWatches implements Interface.
func (r *Remediator) AddWatches(ctx context.Context, gvks map[schema.GroupVersionKind]struct{}) status.MultiError {
return r.watchMgr.AddWatches(ctx, gvks)
}

// UpdateWatches implements Interface.
func (r *Remediator) UpdateWatches(ctx context.Context, gvks map[schema.GroupVersionKind]struct{}) status.MultiError {
return r.watchMgr.UpdateWatches(ctx, gvks)
Expand Down
29 changes: 19 additions & 10 deletions pkg/remediator/watch/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,23 @@ type ListerWatcher interface {
Watcher
}

// ListerWatcherFactory knows how to build ListerWatchers for the specified
// GroupVersionKind and Namespace.
type ListerWatcherFactory func(gvk schema.GroupVersionKind, namespace string) ListerWatcher
type DynamicListerWatcherFactory struct {
DynamicClient *dynamic.DynamicClient
Mapper meta.RESTMapper
}

// ListerWatcher constructs a ListerWatcher for the specified GroupVersionKind
// and Namespace.
func (dlwf *DynamicListerWatcherFactory) ListerWatcher(gvk schema.GroupVersionKind, namespace string) ListerWatcher {
return NewListWatchFromClient(dlwf.DynamicClient, dlwf.Mapper, gvk, namespace)
}

// NewListerWatcherFactoryFromClient creates a ListerWatcherFactory using a
// dynamic client and mapper build from the specified REST config.
func NewListerWatcherFactoryFromClient(cfg *rest.Config) (ListerWatcherFactory, error) {
// DynamicListerWatcherFactoryFromConfig constructs a DynamicListerWatcherFactory
func DynamicListerWatcherFactoryFromConfig(cfg *rest.Config) (*DynamicListerWatcherFactory, error) {
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("failed to build dynamic client: %w", err)
}

httpClient, err := rest.HTTPClientFor(cfg)
if err != nil {
return nil, fmt.Errorf("failed to create HTTPClient: %w", err)
Expand All @@ -75,12 +80,16 @@ func NewListerWatcherFactoryFromClient(cfg *rest.Config) (ListerWatcherFactory,
if err != nil {
return nil, fmt.Errorf("failed to build mapper: %w", err)
}

return func(gvk schema.GroupVersionKind, namespace string) ListerWatcher {
return NewListWatchFromClient(dynamicClient, mapper, gvk, namespace)
return &DynamicListerWatcherFactory{
DynamicClient: dynamicClient,
Mapper: mapper,
}, nil
}

// ListerWatcherFactory knows how to build ListerWatchers for the specified
// GroupVersionKind and Namespace.
type ListerWatcherFactory func(gvk schema.GroupVersionKind, namespace string) ListerWatcher

// ListFunc knows how to list resources.
type ListFunc func(ctx context.Context, options metav1.ListOptions) (*unstructured.UnstructuredList, error)

Expand Down
61 changes: 59 additions & 2 deletions pkg/remediator/watch/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"sync"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -51,6 +52,9 @@ type Manager struct {
// watcherFactory is the function to create a watcher.
watcherFactory watcherFactory

// mapper is the RESTMapper used by the watcherFactory
mapper meta.RESTMapper

// labelSelector filters watches
labelSelector labels.Selector

Expand All @@ -68,18 +72,20 @@ type Manager struct {
// Options contains options for creating a watch manager.
type Options struct {
watcherFactory watcherFactory
mapper meta.RESTMapper
}

// DefaultOptions return the default options with a ListerWatcherFactory built
// from the specified REST config.
func DefaultOptions(cfg *rest.Config) (*Options, error) {
factory, err := NewListerWatcherFactoryFromClient(cfg)
factory, err := DynamicListerWatcherFactoryFromConfig(cfg)
if err != nil {
return nil, status.APIServerError(err, "failed to build ListerWatcherFactory")
}

return &Options{
watcherFactory: watcherFactoryFromListerWatcherFactory(factory),
watcherFactory: watcherFactoryFromListerWatcherFactory(factory.ListerWatcher),
mapper: factory.Mapper,
}, nil
}

Expand Down Expand Up @@ -107,6 +113,7 @@ func NewManager(scope declared.Scope, syncName string, cfg *rest.Config,
resources: decls,
watcherMap: make(map[schema.GroupVersionKind]Runnable),
watcherFactory: options.watcherFactory,
mapper: options.mapper,
labelSelector: labelSelector,
queue: q,
conflictHandler: ch,
Expand All @@ -127,6 +134,55 @@ func (m *Manager) NeedsUpdate() bool {
return m.needsUpdate
}

// AddWatches accepts a map of GVKs that should be watched and takes the
// following actions:
// - start watchers for any GroupVersionKind that is present in the given map
// and not present in the current watch map.
//
// This function is threadsafe.
func (m *Manager) AddWatches(ctx context.Context, gvkMap map[schema.GroupVersionKind]struct{}) status.MultiError {
m.mux.Lock()
defer m.mux.Unlock()
m.watching = true

klog.V(3).Infof("AddWatches(%v)", gvkMap)

var startedWatches uint64

// Start new watchers
var errs status.MultiError
for gvk := range gvkMap {
// Skip watchers that are already started
if _, isWatched := m.watcherMap[gvk]; isWatched {
continue
}
// Only start watcher if the resource exists.
// Watch will be started later by UpdateWatches, after the applier succeeds.
// TODO: register pending resource and start watch when CRD is established
if _, err := m.mapper.RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if meta.IsNoMatchError(err) {
klog.Infof("Remediator skipped adding watch for resource %v: %v: resource watch will be started after apply is successful", gvk, err)
continue
}
errs = status.Append(errs, err)
continue
}
// We don't have a watcher for this type, so add a watcher for it.
if err := m.startWatcher(ctx, gvk); err != nil {
errs = status.Append(errs, err)
continue
}
startedWatches++
}

if startedWatches > 0 {
klog.Infof("The remediator made new progress: started %d new watches", startedWatches)
} else {
klog.V(4).Infof("The remediator made no new progress")
}
return errs
}

// UpdateWatches accepts a map of GVKs that should be watched and takes the
// following actions:
// - stop watchers for any GroupVersionKind that is not present in the given
Expand Down Expand Up @@ -162,6 +218,7 @@ func (m *Manager) UpdateWatches(ctx context.Context, gvkMap map[schema.GroupVers
// We don't have a watcher for this type, so add a watcher for it.
if err := m.startWatcher(ctx, gvk); err != nil {
errs = status.Append(errs, err)
continue
}
startedWatches++
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/syncer/syncertest/fake/conflict_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (h *ConflictHandler) ConflictErrors() []status.ManagementConflictError {
return nil
}

// HasConflictErrors is a fake implementation of HasConflictErrors of conflict.Handler.
func (h *ConflictHandler) HasConflictErrors() bool {
return false
}

// ClearConflictErrorsWithKind is a fake implementation of ClearConflictErrorsWithKind of conflict.Handler.
func (h *ConflictHandler) ClearConflictErrorsWithKind(schema.GroupKind) {
}
Expand Down

0 comments on commit bb9361f

Please sign in to comment.