Skip to content

Commit

Permalink
integration test for cloudevents client
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Sep 8, 2023
1 parent cf1ead4 commit eb87d48
Show file tree
Hide file tree
Showing 123 changed files with 25,078 additions and 4 deletions.
147 changes: 147 additions & 0 deletions cloudevents/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Cloudevents Clients

We have implemented the [cloudevents](https://cloudevents.io/)-based clients in this package to assist developers in
easily implementing the [Event Based Manifestwork](https://github.com/open-cluster-management-io/enhancements/tree/main/enhancements/sig-architecture/224-event-based-manifestwork)
proposal.

## Generic Clients

The generic client (`generic.CloudEventsClient`) is used to resync/publish/subscribe resource objects between sources
and agents with cloudevents.

A resource object can be any object that implements the `generic.ResourceObject` interface.

### Building a generic client on a source

Developers can use `generic.NewCloudEventSourceClient` method to build a generic client on the source. To build this
client the developers need to provide

1. A cloudevents source options (`options.CloudEventsSourceOptions`), this options have two parts
- `sourceID`, it is a unique identifier for a source, for example, it can generate a source ID by hashing the hub
cluster URL and appending the controller name. Similarly, a RESTful service can select a unique name or generate a
unique ID in the associated database for its source identification.
- `CloudEventsOptions`, it provides cloudevents clients to send/receive cloudevents based on different event
protocol. We have supported the MQTT protocol (`mqtt.NewSourceOptions`), developers can use it directly.

2. A resource lister (`generic.Lister`), it is used to list the resource objects on the source when resyncing the
resources between sources and agents, for example, a hub controller can list the resources from the resource informers,
and a RESTful service can list its resources from a database.

3. A resource status hash getter method (`generic.StatusHashGetter`), this method will be used to calculate the resource
status hash when resyncing the resource status between sources and agents.

4. Codecs (`generic.Codec`), they are used to encode a resource object into a cloudevent and decode a cloudevent into a
resource object with a given cloudevent data type. We have provided two data types (`io.open-cluster-management.works.v1alpha1.manifests`
that contains a single resource object in the cloudevent payload and `io.open-cluster-management.works.v1alpha1.manifestbundles`
that contains a list of resource objects in the cloudevent payload) for `ManifestWork`, they can be found in the `work/payload`
package.

5. Resource handler methods (`generic.ResourceHandler`), they are used to handle the resources status after the client
received the resources status from agents.

for example:

```golang
// build a client for the source1
client, err := generic.NewCloudEventSourceClient[*CustomerResource](
ctx,
mqtt.NewSourceOptions(mqtt.NewMQTTOptions(), "source1"),
customerResourceLister,
customerResourceStatusHashGetter,
customerResourceCodec,
)

// start a go routine to receive the resources status from agents
go func() {
if err := client.Subscribe(ctx, customerResourceHandler); err != nil {
//TODO handle this error when subscribing the cloudevents failed
}
}()
```

You may refer to the [cloudevents client integration test](../test/integration/cloudevents/source) as an example.

### Building a generic client on a manged cluster

Developers can use `generic.NewCloudEventAgentClient` method to build a generic client on a managed cluster. To build
this client the developers need to provide

1. A cloudevents agent options (`options.CloudEventsAgentOptions`), this options have three parts
- `agentID`, it is a unique identifier for an agent, for example, it can consist of a managed cluster name and an
agent name.
- `clusterName`, it is the name of a managed cluster on which the agent runs.
- `CloudEventsOptions`, it provides cloudevents clients to send/receive cloudevents based on different event
protocol. We have supported the MQTT protocol (`mqtt.NewAgentOptions`), developers can use it directly.

2. A resource lister (`generic.Lister`), it is used to list the resource objects on a managed cluster when resyncing the
resources between sources and agents, for example, a work agent can list its works from its work informers.

3. A resource status hash getter method (`generic.StatusHashGetter`), this method will be used to calculate the resource
status hash when resyncing the resource status between sources and agents.

4. Codecs (`generic.Codec`), they are used to encode a resource object into a cloudevent and decode a cloudevent into a
resource object with a given cloudevent data type. We have provided two data types (`io.open-cluster-management.works.v1alpha1.manifests`
that contains a single resource object in the cloudevent payload and `io.open-cluster-management.works.v1alpha1.manifestbundles`
that contains a list of resource objects in the cloudevent payload) for `ManifestWork`, they can be found in the `work/payload`
package.

5. Resource handler methods (`generic.ResourceHandler`), they are used to handle the resources after the client received
the resources from sources.

for example:

```golang
// build a client for a work agent on the cluster1
client, err := generic.NewCloudEventAgentClient[*CustomerResource](
ctx,
mqtt.NewAgentOptions(mqtt.NewMQTTOptions(), "cluster1", "cluster1-work-agent"),
&ManifestWorkLister{},
ManifestWorkStatusHash,
&ManifestBundleCodec{},
)

// start a go routine to receive the resources from sources
go func() {
if err := client.Subscribe(ctx, NewManifestWorkAgentHandler()); err != nil {
//TODO handle this error when subscribing the cloudevents failed
}
}()
```

## Work Clients

We have provided a builder to build the `ManifestWork` client (`ManifestWorkInterface`) and informer (`ManifestWorkInformer`)
based on the generic client.

### Building work client for work controllers on the hub cluster

TODO

### Building work client for work agent on the managed cluster

Developers can use the builder to build the `ManifestWork` client and informer with the cluster name.

```golang

clusterName := "cluster1"
// Building the clients based on cloudevents with MQTT
config := mqtt.NewMQTTOptions()

clientHolder, err := work.NewClientHolderBuilder(fmt.Sprintf("%s-work-agent", clusterName), config).
WithClusterName(clusterName).
// Supports two event data types for ManifestWork
WithCodecs(codec.NewManifestBundleCodec(), codec.NewManifestCodec(restMapper)).
NewClientHolder(ctx)
if err != nil {
return err
}

manifestWorkClient := clientHolder.ManifestWorks(clusterName)
manifestWorkInformer := clientHolder.ManifestWorkInformer()

// Building controllers with ManifestWork client and informer ...

// Start the ManifestWork informer
go manifestWorkInformer.Informer().Run(ctx.Done())

```
2 changes: 2 additions & 0 deletions cloudevents/generic/agentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
//
// An agent is a component that handles the deployment of requested resources on the managed cluster and status report
// to the source.
//
// TODO support limiting the message sending rate with a configuration.
type CloudEventAgentClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
sender cloudevents.Client
Expand Down
2 changes: 2 additions & 0 deletions cloudevents/generic/sourceclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
//
// A source is a component that runs on a server, it can be a controller on the hub cluster or a RESTful service
// handling resource requests.
//
// TODO support limiting the message sending rate with a configuration.
type CloudEventSourceClient[T ResourceObject] struct {
cloudEventsOptions options.CloudEventsOptions
sender cloudevents.Client
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.2.0
github.com/mochi-mqtt/server/v2 v2.3.0
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.24.1
github.com/openshift/build-machinery-go v0.0.0-20230306181456-d321ffa04533
Expand Down Expand Up @@ -40,10 +41,13 @@ require (
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -55,6 +59,8 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rs/xid v1.4.0 // indirect
github.com/rs/zerolog v1.28.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
Expand Down
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20230807084042-7f5ef3
github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s=
github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -105,6 +106,7 @@ github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5F
github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
Expand Down Expand Up @@ -170,13 +172,16 @@ github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
Expand Down Expand Up @@ -204,9 +209,15 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2 h1:hAHbPm5IJGijwng3PWk09JkG9WeqChjprR5s9bBZ+OM=
github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mochi-mqtt/server/v2 v2.3.0 h1:vcFb7X7ANH1Qy2yGHMvp86N9VxjoUkZpr5mkIbfMLfw=
github.com/mochi-mqtt/server/v2 v2.3.0/go.mod h1:47GGVR0/5gbM1DzsI0f1yo25jcR1aaUIgj4dzmP5MNY=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down Expand Up @@ -267,6 +278,10 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.28.0 h1:MirSo27VyNi7RJYP3078AA1+Cyzd2GB66qy3aUHvsWY=
github.com/rs/zerolog v1.28.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
Expand Down Expand Up @@ -439,6 +454,8 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
23 changes: 23 additions & 0 deletions test/integration/cloudevents/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package agent

import (
"context"

"open-cluster-management.io/api/cloudevents/generic/options/mqtt"
"open-cluster-management.io/api/cloudevents/work"
"open-cluster-management.io/api/cloudevents/work/agent/codec"
)

func StartWorkAgent(ctx context.Context, clusterName string, config *mqtt.MQTTOptions) (*work.ClientHolder, error) {
clientHolder, err := work.NewClientHolderBuilder(clusterName, config).
WithClusterName(clusterName).
WithCodecs(codec.NewManifestCodec(nil)).
NewClientHolder(ctx)
if err != nil {
return nil, err
}

go clientHolder.ManifestWorkInformer().Informer().Run(ctx.Done())

return clientHolder, nil
}
Loading

0 comments on commit eb87d48

Please sign in to comment.