From 94072679b3898442e13ba9fab1e1d20f53df8c80 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 18 Jun 2024 10:49:01 +0800 Subject: [PATCH] upgrade sdk-go Signed-off-by: Wei Liu --- go.mod | 2 +- go.sum | 4 +- .../integration/cloudevents/source/handler.go | 3 +- .../cloudevents/source/manifestwork.go | 5 +- test/integration/cloudevents/source/source.go | 3 +- .../cloudevents/source}/watcher.go | 2 +- vendor/modules.txt | 3 +- .../work/agent/client/manifestwork.go | 25 ++-- .../cloudevents/work/agent/lister/lister.go | 2 +- .../pkg/cloudevents/work/clientbuilder.go | 29 +++-- .../cloudevents/work/internal/clientset.go | 10 +- .../work/source/client/manifestwork.go | 4 +- .../cloudevents/work/source/lister/lister.go | 9 +- .../sdk-go/pkg/cloudevents/work/store/base.go | 111 +++++++++++++----- .../pkg/cloudevents/work/store/informer.go | 45 ++++--- .../pkg/cloudevents/work/store/interface.go | 12 +- .../pkg/cloudevents/work/store/local.go | 48 ++++---- .../pkg/cloudevents/work/utils/utils.go | 6 +- 18 files changed, 201 insertions(+), 122 deletions(-) rename {vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher => test/integration/cloudevents/source}/watcher.go (99%) diff --git a/go.mod b/go.mod index f62283ad3..15f0bf8fa 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( k8s.io/utils v0.0.0-20240310230437-4693a0247e57 open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556 open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc - open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7 + open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd sigs.k8s.io/controller-runtime v0.17.3 sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96 ) diff --git a/go.sum b/go.sum index a4a85b683..abd250ade 100644 --- a/go.sum +++ b/go.sum @@ -469,8 +469,8 @@ open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556 open-cluster-management.io/addon-framework v0.9.1-0.20240419070222-e703fc5a2556/go.mod h1:HayKCznnlyW+0dUJQGj5sNR6i3tvylSySD3YnvZkBtY= open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc h1:tcfncubZRFphYtDXBE7ApBNlSnj1RNazhW+8F01XYYg= open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc/go.mod h1:ltijKJhDifrPH0csvCUmFt5lzaERv+BBfh6X3l83rT0= -open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7 h1:/Tit/ldsK/+gwYpljBPzOGpFwdN44+yIOiHO+kja5XU= -open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= +open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd h1:kTVZOR7bTdh4ID7EoliyGhPR5CItpx8GehN581IxoPA= +open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y= sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk= diff --git a/test/integration/cloudevents/source/handler.go b/test/integration/cloudevents/source/handler.go index 58fd7ff23..dfdac0636 100644 --- a/test/integration/cloudevents/source/handler.go +++ b/test/integration/cloudevents/source/handler.go @@ -13,12 +13,11 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" ) const ManifestWorkFinalizer = "cluster.open-cluster-management.io/manifest-work-cleanup" -func newManifestWorkStatusHandler(lister workv1lister.ManifestWorkLister, watcher *watcher.ManifestWorkWatcher) generic.ResourceHandler[*workv1.ManifestWork] { +func newManifestWorkStatusHandler(lister workv1lister.ManifestWorkLister, watcher *ManifestWorkWatcher) generic.ResourceHandler[*workv1.ManifestWork] { return func(action types.ResourceAction, work *workv1.ManifestWork) error { switch action { case types.StatusModified: diff --git a/test/integration/cloudevents/source/manifestwork.go b/test/integration/cloudevents/source/manifestwork.go index fd3aee49e..e0bb91cbb 100644 --- a/test/integration/cloudevents/source/manifestwork.go +++ b/test/integration/cloudevents/source/manifestwork.go @@ -20,7 +20,6 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" ) const ManifestsDeleted = "Deleted" @@ -32,7 +31,7 @@ const ( type manifestWorkSourceClient struct { cloudEventsClient *generic.CloudEventSourceClient[*workv1.ManifestWork] - watcher *watcher.ManifestWorkWatcher + watcher *ManifestWorkWatcher lister workv1lister.ManifestWorkLister namespace string } @@ -42,7 +41,7 @@ var manifestWorkGR = schema.GroupResource{Group: workv1.GroupName, Resource: "ma var _ workv1client.ManifestWorkInterface = &manifestWorkSourceClient{} func newManifestWorkSourceClient(cloudEventsClient *generic.CloudEventSourceClient[*workv1.ManifestWork], - watcher *watcher.ManifestWorkWatcher) *manifestWorkSourceClient { + watcher *ManifestWorkWatcher) *manifestWorkSourceClient { return &manifestWorkSourceClient{ cloudEventsClient: cloudEventsClient, watcher: watcher, diff --git a/test/integration/cloudevents/source/source.go b/test/integration/cloudevents/source/source.go index fd3998a38..685afc688 100644 --- a/test/integration/cloudevents/source/source.go +++ b/test/integration/cloudevents/source/source.go @@ -19,7 +19,6 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/work" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" ) const ( @@ -89,7 +88,7 @@ func (m *MQTTSource) Start(ctx context.Context) error { // build a source client workLister := &manifestWorkLister{} - watcher := watcher.NewManifestWorkWatcher() + watcher := NewManifestWorkWatcher() mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(m.configFile) if err != nil { return err diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher/watcher.go b/test/integration/cloudevents/source/watcher.go similarity index 99% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher/watcher.go rename to test/integration/cloudevents/source/watcher.go index d22a7512a..913c52bb4 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher/watcher.go +++ b/test/integration/cloudevents/source/watcher.go @@ -1,4 +1,4 @@ -package watcher +package source import ( "sync" diff --git a/vendor/modules.txt b/vendor/modules.txt index d13b7ab51..f0c020e0e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1589,7 +1589,7 @@ open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/utils/work/v1/workvalidator open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v0.13.1-0.20240614070053-a01091a14da7 +# open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd ## explicit; go 1.21 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 @@ -1620,7 +1620,6 @@ open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister open-cluster-management.io/sdk-go/pkg/cloudevents/work/store open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils -open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher open-cluster-management.io/sdk-go/pkg/patcher # sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 ## explicit; go 1.20 diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go index 47754f67f..f00b20641 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go @@ -26,7 +26,9 @@ import ( type ManifestWorkAgentClient struct { cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork] watcherStore store.WorkClientWatcherStore - clusterName string + + // this namespace should be same with the cluster name to which this client subscribes + namespace string } var _ workv1client.ManifestWorkInterface = &ManifestWorkAgentClient{} @@ -39,10 +41,13 @@ func NewManifestWorkAgentClient( return &ManifestWorkAgentClient{ cloudEventsClient: cloudEventsClient, watcherStore: watcherStore, - clusterName: clusterName, } } +func (c *ManifestWorkAgentClient) SetNamespace(namespace string) { + c.namespace = namespace +} + func (c *ManifestWorkAgentClient) Create(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.CreateOptions) (*workv1.ManifestWork, error) { return nil, errors.NewMethodNotSupported(common.ManifestWorkGR, "create") } @@ -64,13 +69,13 @@ func (c *ManifestWorkAgentClient) DeleteCollection(ctx context.Context, opts met } func (c *ManifestWorkAgentClient) Get(ctx context.Context, name string, opts metav1.GetOptions) (*workv1.ManifestWork, error) { - klog.V(4).Infof("getting manifestwork %s", name) - return c.watcherStore.Get(c.clusterName, name) + klog.V(4).Infof("getting manifestwork %s/%s", c.namespace, name) + return c.watcherStore.Get(c.namespace, name) } func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) { - klog.V(4).Infof("list manifestworks") - works, err := c.watcherStore.List(opts) + klog.V(4).Infof("list manifestworks from cluster %s", c.namespace) + works, err := c.watcherStore.List(c.namespace, opts) if err != nil { return nil, err } @@ -84,13 +89,13 @@ func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOpti } func (c *ManifestWorkAgentClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return c.watcherStore, nil + klog.V(4).Infof("watch manifestworks from cluster %s", c.namespace) + return c.watcherStore.GetWatcher(c.namespace, opts) } func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kubetypes.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *workv1.ManifestWork, err error) { - klog.V(4).Infof("patching manifestwork %s", name) - - lastWork, err := c.watcherStore.Get(c.clusterName, name) + klog.V(4).Infof("patching manifestwork %s/%s", c.namespace, name) + lastWork, err := c.watcherStore.Get(c.namespace, name) if err != nil { return nil, err } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go index e67bd9c1b..36a8ee9e7 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go @@ -31,5 +31,5 @@ func (l *WatcherStoreLister) List(options types.ListOptions) ([]*workv1.Manifest opts.LabelSelector = fmt.Sprintf("%s=%s", common.CloudEventsOriginalSourceLabelKey, options.Source) } - return l.store.List(opts) + return l.store.List(options.ClusterName, opts) } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go index 4d2a96f26..ad156e0b9 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go @@ -141,17 +141,18 @@ func (b *ClientHolderBuilder) NewSourceClientHolder(ctx context.Context) (*Clien workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} - if !b.resync { - return &ClientHolder{workClientSet: workClientSet}, nil - } - - // start a go routine to resync the works when this client reconnected + // start a go routine to receive client reconnect signal go func() { for { select { case <-ctx.Done(): return case <-cloudEventsClient.ReconnectedChan(): + if !b.resync { + klog.V(4).Infof("resync is disabled, do nothing") + continue + } + // when receiving a client reconnected signal, we resync all clusters for this source if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil { klog.Errorf("failed to send resync request, %v", err) @@ -160,6 +161,10 @@ func (b *ClientHolderBuilder) NewSourceClientHolder(ctx context.Context) (*Clien } }() + if !b.resync { + return &ClientHolder{workClientSet: workClientSet}, nil + } + // start a go routine to resync the works after this client's store is initiated go func() { if store.WaitForStoreInit(ctx, b.watcherStore.HasInitiated) { @@ -209,16 +214,18 @@ func (b *ClientHolderBuilder) NewAgentClientHolder(ctx context.Context) (*Client workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} - if !b.resync { - return &ClientHolder{workClientSet: workClientSet}, nil - } - + // start a go routine to receive client reconnect signal go func() { for { select { case <-ctx.Done(): return case <-cloudEventsClient.ReconnectedChan(): + if !b.resync { + klog.V(4).Infof("resync is disabled, do nothing") + continue + } + // when receiving a client reconnected signal, we resync all sources for this agent // TODO after supporting multiple sources, we should only resync agent known sources if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { @@ -228,6 +235,10 @@ func (b *ClientHolderBuilder) NewAgentClientHolder(ctx context.Context) (*Client } }() + if !b.resync { + return &ClientHolder{workClientSet: workClientSet}, nil + } + // start a go routine to resync the works after this client's store is initiated go func() { if store.WaitForStoreInit(ctx, b.watcherStore.HasInitiated) { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal/clientset.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal/clientset.go index 768a6339b..6a00dfcc5 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal/clientset.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal/clientset.go @@ -7,6 +7,8 @@ import ( workclientset "open-cluster-management.io/api/client/work/clientset/versioned" workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" workv1alpha1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1alpha1" + + agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client" sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client" ) @@ -42,7 +44,13 @@ func (c *WorkV1ClientWrapper) ManifestWorks(namespace string) workv1client.Manif sourceManifestWorkClient.SetNamespace(namespace) return sourceManifestWorkClient } - return c.ManifestWorkClient + + if agentManifestWorkClient, ok := c.ManifestWorkClient.(*agentclient.ManifestWorkAgentClient); ok { + agentManifestWorkClient.SetNamespace(namespace) + return agentManifestWorkClient + } + + return nil } func (c *WorkV1ClientWrapper) AppliedManifestWorks() workv1client.AppliedManifestWorkInterface { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go index 26bf55e32..42cf538f6 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go @@ -152,7 +152,7 @@ func (c *ManifestWorkSourceClient) Get(ctx context.Context, name string, opts me func (c *ManifestWorkSourceClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) { klog.V(4).Infof("list manifestworks") - works, err := c.watcherStore.List(opts) + works, err := c.watcherStore.List(c.namespace, opts) if err != nil { return nil, err } @@ -166,7 +166,7 @@ func (c *ManifestWorkSourceClient) List(ctx context.Context, opts metav1.ListOpt } func (c *ManifestWorkSourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return c.watcherStore, nil + return c.watcherStore.GetWatcher(c.namespace, opts) } func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt kubetypes.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *workv1.ManifestWork, err error) { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go index f939beac7..cd83e7975 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go @@ -1,8 +1,6 @@ package lister import ( - "fmt" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" workv1 "open-cluster-management.io/api/work/v1" @@ -23,10 +21,5 @@ func NewWatcherStoreLister(store store.WorkClientWatcherStore) *WatcherStoreList // List returns the ManifestWorks from the WorkClientWatcherCache with list options. func (l *WatcherStoreLister) List(options types.ListOptions) ([]*workv1.ManifestWork, error) { - opts := metav1.ListOptions{} - if options.ClusterName != types.ClusterAll { - opts.FieldSelector = fmt.Sprintf("metadata.namespace=%s", options.ClusterName) - } - - return l.store.List(opts) + return l.store.List(options.ClusterName, metav1.ListOptions{}) } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/base.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/base.go index 5e21e7167..d3a5b5bf9 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/base.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/base.go @@ -29,43 +29,16 @@ const ManifestWorkFinalizer = "cloudevents.open-cluster-management.io/manifest-w type baseStore struct { sync.RWMutex - result chan watch.Event - done chan struct{} - - store cache.Store - + store cache.Store initiated bool - - // a queue to save the received work events - receivedWorks workqueue.RateLimitingInterface -} - -// ResultChan implements watch interface. -func (b *baseStore) ResultChan() <-chan watch.Event { - return b.result -} - -// Stop implements watch interface. -func (b *baseStore) Stop() { - // Call Close() exactly once by locking and setting a flag. - b.Lock() - defer b.Unlock() - - // closing a closed channel always panics, therefore check before closing - select { - case <-b.done: - close(b.result) - default: - close(b.done) - } } // List the works from the store with the list options -func (b *baseStore) List(opts metav1.ListOptions) ([]*workv1.ManifestWork, error) { +func (b *baseStore) List(namespace string, opts metav1.ListOptions) ([]*workv1.ManifestWork, error) { b.RLock() defer b.RUnlock() - return utils.ListWorksWithOptions(b.store, opts) + return utils.ListWorksWithOptions(b.store, namespace, opts) } // Get a works from the store @@ -105,10 +78,17 @@ func (b *baseStore) ListAll() ([]*workv1.ManifestWork, error) { return works, nil } -func (b *baseStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error { +type baseSourceStore struct { + baseStore + + // a queue to save the received work events + receivedWorks workqueue.RateLimitingInterface +} + +func (bs *baseSourceStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error { switch action { case types.StatusModified: - b.receivedWorks.Add(work) + bs.receivedWorks.Add(work) default: return fmt.Errorf("unsupported resource action %s", action) } @@ -239,6 +219,73 @@ func (b *workProcessor) getWork(uid kubetypes.UID) *workv1.ManifestWork { return nil } +// workWatcher implements the watch.Interface. +type workWatcher struct { + sync.RWMutex + + result chan watch.Event + done chan struct{} + stopped bool +} + +var _ watch.Interface = &workWatcher{} + +func newWorkWatcher() *workWatcher { + return &workWatcher{ + // It's easy for a consumer to add buffering via an extra + // goroutine/channel, but impossible for them to remove it, + // so nonbuffered is better. + result: make(chan watch.Event), + // If the watcher is externally stopped there is no receiver anymore + // and the send operations on the result channel, especially the + // error reporting might block forever. + // Therefore a dedicated stop channel is used to resolve this blocking. + done: make(chan struct{}), + } +} + +// ResultChan implements Interface. +func (w *workWatcher) ResultChan() <-chan watch.Event { + return w.result +} + +// Stop implements Interface. +func (w *workWatcher) Stop() { + // Call Close() exactly once by locking and setting a flag. + w.Lock() + defer w.Unlock() + // closing a closed channel always panics, therefore check before closing + select { + case <-w.done: + close(w.result) + default: + w.stopped = true + close(w.done) + } +} + +// Receive a event from the work client and sends down the result channel. +func (w *workWatcher) Receive(evt watch.Event) { + if w.isStopped() { + // this watcher is stopped, do nothing. + return + } + + if klog.V(4).Enabled() { + obj, _ := meta.Accessor(evt.Object) + klog.V(4).Infof("Receive the event %v for %v", evt.Type, obj.GetName()) + } + + w.result <- evt +} + +func (w *workWatcher) isStopped() bool { + w.RLock() + defer w.RUnlock() + + return w.stopped +} + func ensureFinalizers(workFinalizers []string) []string { has := false for _, f := range workFinalizers { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go index f00f2f617..12193bb4b 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go @@ -5,6 +5,7 @@ import ( "fmt" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -19,39 +20,39 @@ import ( // // It is used for building ManifestWork source client. type SourceInformerWatcherStore struct { - baseStore + baseSourceStore + watcher *workWatcher } -var _ watch.Interface = &SourceInformerWatcherStore{} var _ WorkClientWatcherStore = &SourceInformerWatcherStore{} func NewSourceInformerWatcherStore(ctx context.Context) *SourceInformerWatcherStore { s := &SourceInformerWatcherStore{ - baseStore: baseStore{ - result: make(chan watch.Event), - done: make(chan struct{}), + baseSourceStore: baseSourceStore{ + baseStore: baseStore{}, receivedWorks: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "informer-watcher-store"), }, + watcher: newWorkWatcher(), } // start a goroutine to process the received work events from the work queue with current store. - go newWorkProcessor(s.baseStore.receivedWorks, s).run(ctx.Done()) + go newWorkProcessor(s.receivedWorks, s).run(ctx.Done()) return s } func (s *SourceInformerWatcherStore) Add(work *workv1.ManifestWork) error { - s.result <- watch.Event{Type: watch.Added, Object: work} + s.watcher.Receive(watch.Event{Type: watch.Added, Object: work}) return nil } func (s *SourceInformerWatcherStore) Update(work *workv1.ManifestWork) error { - s.result <- watch.Event{Type: watch.Modified, Object: work} + s.watcher.Receive(watch.Event{Type: watch.Modified, Object: work}) return nil } func (s *SourceInformerWatcherStore) Delete(work *workv1.ManifestWork) error { - s.result <- watch.Event{Type: watch.Deleted, Object: work} + s.watcher.Receive(watch.Event{Type: watch.Deleted, Object: work}) return nil } @@ -59,6 +60,14 @@ func (s *SourceInformerWatcherStore) HasInitiated() bool { return s.initiated } +func (s *SourceInformerWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) { + if namespace != metav1.NamespaceAll { + return nil, fmt.Errorf("unsupported to watch from the namespace %s", namespace) + } + + return s.watcher, nil +} + func (s *SourceInformerWatcherStore) SetStore(store cache.Store) { s.store = store s.initiated = true @@ -71,32 +80,30 @@ func (s *SourceInformerWatcherStore) SetStore(store cache.Store) { // It is used for building ManifestWork agent client. type AgentInformerWatcherStore struct { baseStore + watcher *workWatcher } -var _ watch.Interface = &AgentInformerWatcherStore{} var _ WorkClientWatcherStore = &AgentInformerWatcherStore{} func NewAgentInformerWatcherStore() *AgentInformerWatcherStore { return &AgentInformerWatcherStore{ - baseStore: baseStore{ - result: make(chan watch.Event), - done: make(chan struct{}), - }, + baseStore: baseStore{}, + watcher: newWorkWatcher(), } } func (s *AgentInformerWatcherStore) Add(work *workv1.ManifestWork) error { - s.result <- watch.Event{Type: watch.Added, Object: work} + s.watcher.Receive(watch.Event{Type: watch.Added, Object: work}) return nil } func (s *AgentInformerWatcherStore) Update(work *workv1.ManifestWork) error { - s.result <- watch.Event{Type: watch.Modified, Object: work} + s.watcher.Receive(watch.Event{Type: watch.Modified, Object: work}) return nil } func (s *AgentInformerWatcherStore) Delete(work *workv1.ManifestWork) error { - s.result <- watch.Event{Type: watch.Deleted, Object: work} + s.watcher.Receive(watch.Event{Type: watch.Deleted, Object: work}) return nil } @@ -138,6 +145,10 @@ func (s *AgentInformerWatcherStore) HandleReceivedWork(action types.ResourceActi } } +func (s *AgentInformerWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) { + return s.watcher, nil +} + func (s *AgentInformerWatcherStore) HasInitiated() bool { return s.initiated } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/interface.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/interface.go index f5f72d1e3..90f7c6f90 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/interface.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/interface.go @@ -15,13 +15,13 @@ import ( const syncedPollPeriod = 100 * time.Millisecond -// StoreInitiated is a function that can be used to determine if an informer has synced. -// This is useful for determining if caches have synced. +// StoreInitiated is a function that can be used to determine if a store has initiated. type StoreInitiated func() bool -// WorkClientWatcherStore extends the watch interface with a work store. +// WorkClientWatcherStore provides a watcher with a work store. type WorkClientWatcherStore interface { - watch.Interface + // GetWatcher returns a watcher to receive work changes. + GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) // HandleReceivedWork handles the client received work events. HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error @@ -38,8 +38,8 @@ type WorkClientWatcherStore interface { // watcher store, in some case, it does not need to update a store, but just send a watch event. Delete(work *workv1.ManifestWork) error - // List returns the works from store with list options - List(opts metav1.ListOptions) ([]*workv1.ManifestWork, error) + // List returns the works from store for a given namespace with list options + List(namespace string, opts metav1.ListOptions) ([]*workv1.ManifestWork, error) // ListAll list all of the works from store ListAll() ([]*workv1.ManifestWork, error) diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go index 17e6579a9..b56aa3ddf 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go @@ -25,14 +25,14 @@ type watchEvent struct { Type watch.EventType } -var _ watch.Interface = &SourceLocalWatcherStore{} var _ WorkClientWatcherStore = &SourceLocalWatcherStore{} // SourceLocalWatcherStore caches the works in this local store and provide the watch ability by watch event channel. // // It is used for building ManifestWork source client. type SourceLocalWatcherStore struct { - baseStore + baseSourceStore + watcher *workWatcher eventQueue cache.Queue } @@ -56,24 +56,19 @@ func NewSourceLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc } s := &SourceLocalWatcherStore{ - baseStore: baseStore{ - // A channel for watcher, it's easy for a consumer to add buffering via an extra - // goroutine/channel, but impossible for them to remove it, so nonbuffered is better. - result: make(chan watch.Event), - // If the watcher is externally stopped there is no receiver anymore - // and the send operations on the result channel, especially the - // error reporting might block forever. - // Therefore a dedicated stop channel is used to resolve this blocking. - done: make(chan struct{}), - - store: store, - initiated: true, + baseSourceStore: baseSourceStore{ + baseStore: baseStore{ + store: store, + initiated: true, + }, // A queue to save the received work events, it helps us retry events // where errors occurred while processing receivedWorks: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "local-watcher-store"), }, + watcher: newWorkWatcher(), + // A queue to save the work client send events, if run a client without a watcher, // it will block the client, this queue helps to resolve this blocking. // Only save the latest event for a work. @@ -88,7 +83,7 @@ func NewSourceLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc } // start a goroutine to process the received work events from the work queue with current store. - go newWorkProcessor(s.baseStore.receivedWorks, s).run(ctx.Done()) + go newWorkProcessor(s.receivedWorks, s).run(ctx.Done()) // start a goroutine to handle the events that are produced by work client go wait.Until(s.processLoop, time.Second, ctx.Done()) @@ -132,6 +127,19 @@ func (s *SourceLocalWatcherStore) Delete(work *workv1.ManifestWork) error { return s.eventQueue.Update(&watchEvent{Key: key(work), Type: watch.Deleted}) } +func (s *SourceLocalWatcherStore) HasInitiated() bool { + return s.initiated +} + +func (s *SourceLocalWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) { + // TODO may consider to support watch with namespace + if namespace != metav1.NamespaceAll { + return nil, fmt.Errorf("unsupported to watch from the namespace %s", namespace) + } + + return s.watcher, nil +} + // processLoop drains the work event queue and send the event to the watch channel. func (s *SourceLocalWatcherStore) processLoop() { for { @@ -175,7 +183,7 @@ func (s *SourceLocalWatcherStore) processLoop() { // the work has been deleted, return a work only with its namespace and name // this will be blocked until this event is consumed - s.result <- watch.Event{ + s.watcher.Receive(watch.Event{ Type: watch.Deleted, Object: &workv1.ManifestWork{ ObjectMeta: metav1.ObjectMeta{ @@ -183,7 +191,7 @@ func (s *SourceLocalWatcherStore) processLoop() { Namespace: namespace, }, }, - } + }) return } @@ -198,14 +206,10 @@ func (s *SourceLocalWatcherStore) processLoop() { } // this will be blocked until this event is consumed - s.result <- watch.Event{Type: evt.Type, Object: work} + s.watcher.Receive(watch.Event{Type: evt.Type, Object: work}) } } -func (c *SourceLocalWatcherStore) HasInitiated() bool { - return c.initiated -} - func key(work *workv1.ManifestWork) string { return work.Namespace + "/" + work.Name } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go index 886cab552..0e3c1df01 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go @@ -66,7 +66,7 @@ func UID(sourceID, namespace, name string) string { } // ListWorksWithOptions retrieves the manifestworks from store which matches the options. -func ListWorksWithOptions(store cache.Store, opts metav1.ListOptions) ([]*workv1.ManifestWork, error) { +func ListWorksWithOptions(store cache.Store, namespace string, opts metav1.ListOptions) ([]*workv1.ManifestWork, error) { var err error labelSelector := labels.Everything() @@ -94,6 +94,10 @@ func ListWorksWithOptions(store cache.Store, opts metav1.ListOptions) ([]*workv1 return } + if namespace != metav1.NamespaceAll && work.Namespace != namespace { + return + } + workFieldSet := fields.Set{ "metadata.name": work.Name, "metadata.namespace": work.Namespace,