diff --git a/go.mod b/go.mod index 63aa2eb7a..e8b9bba50 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( k8s.io/utils v0.0.0-20240310230437-4693a0247e57 open-cluster-management.io/addon-framework v0.9.1 open-cluster-management.io/api v0.13.1-0.20240411131856-8f6aa25f111c - open-cluster-management.io/sdk-go v0.13.1-0.20240313075541-00a94671ced1 + open-cluster-management.io/sdk-go v0.13.1-0.20240415030117-612344aae744 sigs.k8s.io/controller-runtime v0.17.2 sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96 ) diff --git a/go.sum b/go.sum index 1426c1650..46f6b3072 100644 --- a/go.sum +++ b/go.sum @@ -427,8 +427,8 @@ open-cluster-management.io/addon-framework v0.9.1 h1:m6n/W29G/4KzMx+8mgC9P/ybuiy open-cluster-management.io/addon-framework v0.9.1/go.mod h1:OEIFCEXhZKO/Grv08CB0T+TGzS0bLshw4G9u7Vw8dw0= open-cluster-management.io/api v0.13.1-0.20240411131856-8f6aa25f111c h1:/iUoY6/PqBmcBq3v0+UBFvIcI39k/QPRGqpOv9XtDIc= open-cluster-management.io/api v0.13.1-0.20240411131856-8f6aa25f111c/go.mod h1:CuCPEzXDvOyxBB0H1d1eSeajbHqaeGEKq9c63vQc63w= -open-cluster-management.io/sdk-go v0.13.1-0.20240313075541-00a94671ced1 h1:s3dJdi1eol+/8ek6JQuaEuoGPkK/wRyM9zowqzKHPDY= -open-cluster-management.io/sdk-go v0.13.1-0.20240313075541-00a94671ced1/go.mod h1:sq+amR9Ls9JzMP5dypvlCx4jIGfDg45gicS67Z/MnlI= +open-cluster-management.io/sdk-go v0.13.1-0.20240415030117-612344aae744 h1:dBO6eK3gHSoRpl8OckW1zyOp35BOI48rYgoCznrPn40= +open-cluster-management.io/sdk-go v0.13.1-0.20240415030117-612344aae744/go.mod h1:w2OaxtCyegxeyFLU42UQ3oxUz01QdsBQkcHI17T/l48= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y= sigs.k8s.io/controller-runtime v0.17.2 h1:FwHwD1CTUemg0pW2otk7/U5/i5m2ymzvOXdbeGOUvw0= diff --git a/vendor/modules.txt b/vendor/modules.txt index 6d1b14233..27393e24c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1578,7 +1578,7 @@ open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/utils/work/v1/workvalidator open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v0.13.1-0.20240313075541-00a94671ced1 +# open-cluster-management.io/sdk-go v0.13.1-0.20240415030117-612344aae744 ## explicit; go 1.21 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go index fc451156f..417b31665 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go @@ -282,7 +282,23 @@ func (c *CloudEventAgentClient[T]) specAction(source string, obj T) (evt types.R return types.Deleted, nil } - if obj.GetResourceVersion() == lastObj.GetResourceVersion() { + // if both the current and the last object have the resource version "0", then object + // is considered as modified, the message broker guarantees the order of the messages + if obj.GetResourceVersion() == "0" && lastObj.GetResourceVersion() == "0" { + return types.Modified, nil + } + + resourceVersion, err := strconv.ParseInt(obj.GetResourceVersion(), 10, 64) + if err != nil { + return evt, err + } + + lastResourceVersion, err := strconv.ParseInt(lastObj.GetResourceVersion(), 10, 64) + if err != nil { + return evt, err + } + + if resourceVersion <= lastResourceVersion { return evt, nil } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go index 5c78bf7a9..0ba5c7187 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go @@ -27,6 +27,7 @@ type receiveFn func(ctx context.Context, evt cloudevents.Event) type baseClient struct { sync.RWMutex cloudEventsOptions options.CloudEventsOptions + cloudEventsProtocol options.CloudEventsProtocol cloudEventsClient cloudevents.Client cloudEventsRateLimiter flowcontrol.RateLimiter receiverChan chan int @@ -35,7 +36,7 @@ type baseClient struct { func (c *baseClient) connect(ctx context.Context) error { var err error - c.cloudEventsClient, err = c.cloudEventsOptions.Client(ctx) + c.cloudEventsClient, err = c.newCloudEventsClient(ctx) if err != nil { return err } @@ -57,7 +58,8 @@ func (c *baseClient) connect(ctx context.Context) error { for { if cloudEventsClient == nil { klog.V(4).Infof("reconnecting the cloudevents client") - cloudEventsClient, err = c.cloudEventsOptions.Client(ctx) + + c.cloudEventsClient, err = c.newCloudEventsClient(ctx) // TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection // errors if err != nil { @@ -77,6 +79,9 @@ func (c *baseClient) connect(ctx context.Context) error { select { case <-ctx.Done(): + if c.receiverChan != nil { + close(c.receiverChan) + } return case err, ok := <-c.cloudEventsOptions.ErrorChan(): if !ok { @@ -90,7 +95,12 @@ func (c *baseClient) connect(ctx context.Context) error { // client to nil and retry c.sendReceiverSignal(stopReceiverSignal) + err = c.cloudEventsProtocol.Close(ctx) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to close the cloudevents protocol, %v", err)) + } cloudEventsClient = nil + c.resetClient(cloudEventsClient) <-wait.RealTimer(delayFn()).C() @@ -167,7 +177,6 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { select { case <-ctx.Done(): receiverCancel() - close(c.receiverChan) return case signal, ok := <-c.receiverChan: if !ok { @@ -219,3 +228,16 @@ func (c *baseClient) sendReconnectedSignal() { defer c.RUnlock() c.reconnectedChan <- struct{}{} } + +func (c *baseClient) newCloudEventsClient(ctx context.Context) (cloudevents.Client, error) { + var err error + c.cloudEventsProtocol, err = c.cloudEventsOptions.Protocol(ctx) + if err != nil { + return nil, err + } + c.cloudEventsClient, err = cloudevents.NewClient(c.cloudEventsProtocol) + if err != nil { + return nil, err + } + return c.cloudEventsClient, nil +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go index 293f4b178..7e94551bd 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go @@ -33,8 +33,8 @@ func (o *grpcAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E return ctx, nil } -func (o *grpcAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) { - receiver, err := o.GetCloudEventsClient( +func (o *grpcAgentOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { + receiver, err := o.GetCloudEventsProtocol( ctx, func(err error) { o.errorChan <- err diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go index 403673c09..5f0f8ecd6 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go @@ -14,8 +14,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "gopkg.in/yaml.v2" - cloudevents "github.com/cloudevents/sdk-go/v2" - + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol" ) @@ -119,7 +118,7 @@ func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { return conn, nil } -func (o *GRPCOptions) GetCloudEventsClient(ctx context.Context, errorHandler func(error), clientOpts ...protocol.Option) (cloudevents.Client, error) { +func (o *GRPCOptions) GetCloudEventsProtocol(ctx context.Context, errorHandler func(error), clientOpts ...protocol.Option) (options.CloudEventsProtocol, error) { conn, err := o.GetGRPCClientConn() if err != nil { return nil, err @@ -146,10 +145,5 @@ func (o *GRPCOptions) GetCloudEventsClient(ctx context.Context, errorHandler fun opts := []protocol.Option{} opts = append(opts, clientOpts...) - p, err := protocol.NewProtocol(conn, opts...) - if err != nil { - return nil, err - } - - return cloudevents.NewClient(p) + return protocol.NewProtocol(conn, opts...) } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go index e9a65ff02..1d71cfe7d 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go @@ -31,8 +31,8 @@ func (o *gRPCSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents. return ctx, nil } -func (o *gRPCSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) { - receiver, err := o.GetCloudEventsClient( +func (o *gRPCSourceOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { + receiver, err := o.GetCloudEventsProtocol( ctx, func(err error) { o.errorChan <- err diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go index 526c7feb5..9d1ef702e 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go @@ -82,7 +82,7 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E return cloudeventscontext.WithTopic(ctx, eventsTopic), nil } -func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) { +func (o *mqttAgentOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { subscribe := &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ // TODO support multiple sources, currently the client require the source events topic has a sourceID, in @@ -97,7 +97,7 @@ func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, erro subscribe.Subscriptions[o.Topics.SourceBroadcast] = paho.SubscribeOptions{QoS: byte(o.SubQoS)} } - receiver, err := o.GetCloudEventsClient( + return o.GetCloudEventsProtocol( ctx, fmt.Sprintf("%s-client", o.agentID), func(err error) { @@ -106,10 +106,6 @@ func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, erro cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}), cloudeventsmqtt.WithSubscribe(subscribe), ) - if err != nil { - return nil, err - } - return receiver, nil } func (o *mqttAgentOptions) ErrorChan() <-chan error { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go index 56b13be94..cfce3004c 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go @@ -12,11 +12,11 @@ import ( "time" cloudeventsmqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2" - cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" "gopkg.in/yaml.v2" "k8s.io/apimachinery/pkg/util/errors" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) @@ -198,12 +198,12 @@ func (o *MQTTOptions) GetMQTTConnectOption(clientID string) *paho.Connect { return connect } -func (o *MQTTOptions) GetCloudEventsClient( +func (o *MQTTOptions) GetCloudEventsProtocol( ctx context.Context, clientID string, errorHandler func(error), clientOpts ...cloudeventsmqtt.Option, -) (cloudevents.Client, error) { +) (options.CloudEventsProtocol, error) { netConn, err := o.GetNetConn() if err != nil { return nil, err @@ -217,12 +217,7 @@ func (o *MQTTOptions) GetCloudEventsClient( opts := []cloudeventsmqtt.Option{cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID))} opts = append(opts, clientOpts...) - protocol, err := cloudeventsmqtt.New(ctx, config, opts...) - if err != nil { - return nil, err - } - - return cloudevents.NewClient(protocol) + return cloudeventsmqtt.New(ctx, config, opts...) } func validateTopics(topics *types.Topics) error { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go index 1f51e378c..9c245809a 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go @@ -70,7 +70,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents. return cloudeventscontext.WithTopic(ctx, eventsTopic), nil } -func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) { +func (o *mqttSourceOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents) if err != nil { return nil, err @@ -93,7 +93,7 @@ func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, err subscribe.Subscriptions[o.Topics.AgentBroadcast] = paho.SubscribeOptions{QoS: byte(o.SubQoS)} } - receiver, err := o.GetCloudEventsClient( + receiver, err := o.GetCloudEventsProtocol( ctx, o.clientID, func(err error) { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go index 57fbf14e9..9bd231c89 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go @@ -4,6 +4,7 @@ import ( "context" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/protocol" ) // CloudEventsOptions provides cloudevents clients to send/receive cloudevents based on different event protocol. @@ -16,14 +17,22 @@ type CloudEventsOptions interface { // the MQTT topic, for Kafka, the context should contain the message key, etc. WithContext(ctx context.Context, evtContext cloudevents.EventContext) (context.Context, error) - // Client returns a cloudevents client for sending and receiving cloudevents - Client(ctx context.Context) (cloudevents.Client, error) + // Protocol returns a specific protocol to initialize the cloudevents client + Protocol(ctx context.Context) (CloudEventsProtocol, error) // ErrorChan returns a chan which will receive the cloudevents connection error. The source/agent client will try to // reconnect the when this error occurs. ErrorChan() <-chan error } +// CloudEventsProtocol is a set of interfaces for a specific binding need to implemented +// Reference: https://cloudevents.github.io/sdk-go/protocol_implementations.html#protocol-interfaces +type CloudEventsProtocol interface { + protocol.Sender + protocol.Receiver + protocol.Closer +} + // EventRateLimit for limiting the event sending rate. type EventRateLimit struct { // QPS indicates the maximum QPS to send the event. diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go index fa3901430..983c7634c 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/handler/resourcehandler.go @@ -2,11 +2,9 @@ package handler import ( "fmt" - "strconv" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/watch" - "k8s.io/klog/v2" workv1lister "open-cluster-management.io/api/client/work/listers/work/v1" workv1 "open-cluster-management.io/api/work/v1" @@ -29,21 +27,6 @@ func NewManifestWorkAgentHandler(lister workv1lister.ManifestWorkNamespaceLister return err } - resourceVersion, err := strconv.ParseInt(work.ResourceVersion, 10, 64) - if err != nil { - return fmt.Errorf("failed to parse the resourceVersion of the manifestwork %s, %v", work.Name, err) - } - - lastResourceVersion, err := strconv.ParseInt(lastWork.ResourceVersion, 10, 64) - if err != nil { - return fmt.Errorf("failed to parse the resourceVersion of the manifestwork %s, %v", lastWork.Name, err) - } - - if resourceVersion <= lastResourceVersion { - klog.Infof("The work %s resource version is less than or equal to cached, ignore", work.Name) - return nil - } - updatedWork := work.DeepCopy() // restore the fields that are maintained by local agent diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go index bc0e92eff..c9d6083e3 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go @@ -175,11 +175,6 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku return nil, err } - if generation <= lastWork.Generation { - return nil, fmt.Errorf("the work %s/%s current generation %d is less than or equal to the last generation %d", - c.namespace, name, generation, lastWork.Generation) - } - eventDataType, err := types.ParseCloudEventsDataType(lastWork.Annotations[common.CloudEventsDataTypeAnnotationKey]) if err != nil { return nil, err @@ -202,15 +197,19 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku return newWork.DeepCopy(), nil } +// getWorkGeneration retrieves the work generation from the annotation with the key +// "cloudevents.open-cluster-management.io/generation". +// if no generation is set in the annotation, then 0 is returned, which means the message +// broker guarantees the message order. func getWorkGeneration(work *workv1.ManifestWork) (int64, error) { generation, ok := work.Annotations[common.CloudEventsGenerationAnnotationKey] if !ok { - return -1, fmt.Errorf("the annotation %s is not found from work %s", common.CloudEventsGenerationAnnotationKey, work.UID) + return 0, nil } generationInt, err := strconv.Atoi(generation) if err != nil { - return -1, err + return 0, fmt.Errorf("failed to convert generation %s to int: %v", generation, err) } return int64(generationInt), nil