Skip to content

Commit

Permalink
support cloudevents for manifestworkreplicaset
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Mar 6, 2024
1 parent 92d4f86 commit b0278e4
Show file tree
Hide file tree
Showing 62 changed files with 3,864 additions and 337 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/net v0.19.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.29.1
k8s.io/apiextensions-apiserver v0.29.0
k8s.io/apimachinery v0.29.1
Expand All @@ -35,7 +36,7 @@ require (
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
open-cluster-management.io/addon-framework v0.8.1-0.20240205013730-13fbb6259464
open-cluster-management.io/api v0.13.0
open-cluster-management.io/sdk-go v0.13.0
open-cluster-management.io/sdk-go v0.13.1-0.20240306030534-3142983462d9
sigs.k8s.io/controller-runtime v0.16.2
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96
)
Expand Down Expand Up @@ -142,7 +143,6 @@ require (
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
helm.sh/helm/v3 v3.11.1 // indirect
k8s.io/kms v0.29.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ open-cluster-management.io/addon-framework v0.8.1-0.20240205013730-13fbb6259464
open-cluster-management.io/addon-framework v0.8.1-0.20240205013730-13fbb6259464/go.mod h1:SBs6wF0Umzr5/miJb9p8uMaTDbcjphHHQLa76nXnbU8=
open-cluster-management.io/api v0.13.0 h1:dlcJEZlNlE0DmSDctK2s7iWKg9l+Tgb0V78Z040nMuk=
open-cluster-management.io/api v0.13.0/go.mod h1:CuCPEzXDvOyxBB0H1d1eSeajbHqaeGEKq9c63vQc63w=
open-cluster-management.io/sdk-go v0.13.0 h1:ddMGsPUekQr9z03tVN6vF39Uf+WEKMtGU/xSd81HdoA=
open-cluster-management.io/sdk-go v0.13.0/go.mod h1:UnsjzYOrDTF9a8rHEXksoIAtAdO1o5CD5Jtaw6T5B9w=
open-cluster-management.io/sdk-go v0.13.1-0.20240306030534-3142983462d9 h1:W3xPJTvHbLPG9Ni6u7wUatbscwxSJ+wt+XcNliQNaFk=
open-cluster-management.io/sdk-go v0.13.1-0.20240306030534-3142983462d9/go.mod h1:ozM8rSNL7acZEGzpqhv2W715Aqq83LAbSBBq7sp6Vv8=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.16.2 h1:mwXAVuEk3EQf478PQwQ48zGOXvW27UJc8NHktQVuIPU=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ spec:
args:
- "/work"
- "manager"
- "--work-driver=kube"
{{ if .HostedMode }}
- "--kubeconfig=/var/run/secrets/hub/kubeconfig"
{{ end }}
Expand Down
11 changes: 8 additions & 3 deletions pkg/cmd/hub/work.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ import (

// NewHubManager generates a command to start hub manager
func NewWorkController() *cobra.Command {
opts := commonoptions.NewOptions()
cmdConfig := opts.
NewControllerCommandConfig("work-manager", version.Get(), hub.RunWorkHubManager)
commonOpts := commonoptions.NewOptions()
hubOpts := hub.NewWorkHubManagerOptions()
hubCfg := hub.NewWorkHubManagerConfig(hubOpts)
cmdConfig := commonOpts.NewControllerCommandConfig("work-manager", version.Get(), hubCfg.RunWorkHubManager)
cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd.Use = "manager"
cmd.Short = "Start the Work Hub Manager"

flags := cmd.Flags()
commonOpts.AddFlags(flags)
hubOpts.AddFlags(flags)

return cmd
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestAddFinalizerTwiceReconcile(t *testing.T) {
workClient: fakeClient,
}

mwrSetTest, _, err := addFinalizerController.reconcile(context.TODO(), mwrSetTest)
_, _, err := addFinalizerController.reconcile(context.TODO(), mwrSetTest)
if err != nil {
t.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ const (
func NewManifestWorkReplicaSetController(
recorder events.Recorder,
workClient workclientset.Interface,
workApplier *workapplier.WorkApplier,
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) factory.Controller {

controller := newController(
workClient, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer)
workClient, workApplier, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer)

err := manifestWorkReplicaSetInformer.Informer().AddIndexers(
cache.Indexers{
Expand Down Expand Up @@ -114,6 +115,7 @@ func NewManifestWorkReplicaSetController(
}

func newController(workClient workclientset.Interface,
workApplier *workapplier.WorkApplier,
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
Expand All @@ -124,11 +126,20 @@ func newController(workClient workclientset.Interface,
manifestWorkReplicaSetIndexer: manifestWorkReplicaSetInformer.Informer().GetIndexer(),

reconcilers: []ManifestWorkReplicaSetReconcile{
&finalizeReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(workClient, manifestWorkInformer.Lister()),
workClient: workClient, manifestWorkLister: manifestWorkInformer.Lister()},
&addFinalizerReconciler{workClient: workClient},
&deployReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(workClient, manifestWorkInformer.Lister()),
manifestWorkLister: manifestWorkInformer.Lister(), placementLister: placementInformer.Lister(), placeDecisionLister: placeDecisionInformer.Lister()},
&finalizeReconciler{
workApplier: workApplier,
workClient: workClient,
manifestWorkLister: manifestWorkInformer.Lister(),
},
&addFinalizerReconciler{
workClient: workClient,
},
&deployReconciler{
workApplier: workApplier,
manifestWorkLister: manifestWorkInformer.Lister(),
placementLister: placementInformer.Lister(),
placeDecisionLister: placeDecisionInformer.Lister(),
},
&statusReconciler{manifestWorkLister: manifestWorkInformer.Lister()},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"

testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
helpertest "open-cluster-management.io/ocm/pkg/work/hub/test"
Expand Down Expand Up @@ -237,6 +238,7 @@ func TestManifestWorkReplicaSetControllerPatchStatus(t *testing.T) {

ctrl := newController(
fakeClient,
workapplier.NewWorkApplierWithTypedClient(fakeClient, workInformers.Work().V1().ManifestWorks().Lister()),
workInformers.Work().V1alpha1().ManifestWorkReplicaSets(),
workInformers.Work().V1().ManifestWorks(),
clusterInformers.Cluster().V1beta1().Placements(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
clustersdkv1alpha1 "open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"

"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/work/helper"
Expand Down Expand Up @@ -271,8 +273,14 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN
ObjectMeta: metav1.ObjectMeta{
Name: mwrSet.Name,
Namespace: clusterNS,
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName},
Labels: map[string]string{
ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName,
},
Annotations: map[string]string{
common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(),
common.CloudEventsGenerationAnnotationKey: fmt.Sprintf("%d", mwrSet.Generation),
},
},
Spec: mwrSet.Spec.ManifestWorkTemplate}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func TestRequeueWithProgressDeadline(t *testing.T) {
placementLister: placementLister,
}

mwrSet, _, err := pmwDeployController.reconcile(context.TODO(), mwrSet)
_, _, err := pmwDeployController.reconcile(context.TODO(), mwrSet)
var rqe helpers.RequeueError
if !errors.As(err, &rqe) {
t.Errorf("expect to get err %t", err)
Expand Down
74 changes: 59 additions & 15 deletions pkg/work/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,56 @@ import (
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"

"open-cluster-management.io/ocm/pkg/work/hub/controllers/manifestworkreplicasetcontroller"
)

const sourceID = "mwrsctrl"

// WorkHubManagerConfig holds configuration for work hub manager controller
type WorkHubManagerConfig struct {
workOptions *WorkHubManagerOptions
}

// NewWorkHubManagerConfig returns a WorkHubManagerConfig
func NewWorkHubManagerConfig(opts *WorkHubManagerOptions) *WorkHubManagerConfig {
return &WorkHubManagerConfig{
workOptions: opts,
}
}

// RunWorkHubManager starts the controllers on hub.
func RunWorkHubManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
hubWorkClient, err := workclientset.NewForConfig(controllerContext.KubeConfig)
func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
hubClusterClient, err := clusterclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

hubClusterClient, err := clusterclientset.NewForConfig(controllerContext.KubeConfig)
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(hubClusterClient, 30*time.Minute)

// build a hub work client for ManifestWorkReplicaSets
replicaSetsClient, err := workclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

clusterInformerFactory := clusterinformers.NewSharedInformerFactory(hubClusterClient, 30*time.Minute)
// To support sending ManifestWorks to different drivers (like the Kubernetes apiserver or MQTT broker), we build
// ManifestWork client that implements the ManifestWorkInterface and ManifestWork informer based on different
// driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
_, config, err := cloudeventswork.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig).
WithKubeConfig(controllerContext.KubeConfig).
LoadConfig()
if err != nil {
return err
}

// we need a separated filtered manifestwork informers so we only watch the manifestworks that manifestworkreplicaset cares.
// This could reduce a lot of memory consumptions
manifestWorkInformerFactory := workinformers.NewSharedInformerFactoryWithOptions(hubWorkClient, 30*time.Minute, workinformers.WithTweakListOptions(
workInformOption := workinformers.WithTweakListOptions(
func(listOptions *metav1.ListOptions) {
selector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
Expand All @@ -43,32 +72,47 @@ func RunWorkHubManager(ctx context.Context, controllerContext *controllercmd.Con
}
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
},
))
)

return RunControllerManagerWithInformers(ctx, controllerContext, hubWorkClient, manifestWorkInformerFactory, clusterInformerFactory)
clientHolder, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(c.workOptions.CloudEventsClientID).
WithSourceID(sourceID).
WithInformerConfig(30*time.Minute, workInformOption).
WithCodecs(codec.NewManifestBundleCodec()).
NewSourceClientHolder(ctx)
if err != nil {
return err
}

return RunControllerManagerWithInformers(ctx, controllerContext, replicaSetsClient, clientHolder, clusterInformerFactory)
}

func RunControllerManagerWithInformers(
ctx context.Context,
controllerContext *controllercmd.ControllerContext,
hubWorkClient workclientset.Interface,
manifestWorkInformers workinformers.SharedInformerFactory,
replicaSetClient workclientset.Interface,
hubWorkClientHolder *cloudeventswork.ClientHolder,
clusterInformers clusterinformers.SharedInformerFactory,
) error {
workInformerFactory := workinformers.NewSharedInformerFactory(hubWorkClient, 30*time.Minute)
replicaSetInformerFactory := workinformers.NewSharedInformerFactory(replicaSetClient, 30*time.Minute)
hubWorkInformer := hubWorkClientHolder.ManifestWorkInformer()

manifestWorkReplicaSetController := manifestworkreplicasetcontroller.NewManifestWorkReplicaSetController(
controllerContext.EventRecorder,
hubWorkClient,
workInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets(),
manifestWorkInformers.Work().V1().ManifestWorks(),
replicaSetClient,
workapplier.NewWorkApplierWithTypedClient(hubWorkClientHolder.WorkInterface(), hubWorkInformer.Lister()),
replicaSetInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets(),
hubWorkInformer,
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
)

go clusterInformers.Start(ctx.Done())
go workInformerFactory.Start(ctx.Done())
go manifestWorkInformers.Start(ctx.Done())
go replicaSetInformerFactory.Start(ctx.Done())
go manifestWorkReplicaSetController.Run(ctx, 5)

go hubWorkInformer.Informer().Run(ctx.Done())

<-ctx.Done()
return nil
}
27 changes: 27 additions & 0 deletions pkg/work/hub/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package hub

import (
"github.com/spf13/pflag"
)

// WorkHubManagerOptions defines the flags for work hub manager
type WorkHubManagerOptions struct {
WorkDriver string
WorkDriverConfig string

CloudEventsClientID string
}

func NewWorkHubManagerOptions() *WorkHubManagerOptions {
return &WorkHubManagerOptions{}
}

// AddFlags register and binds the default flags
func (o *WorkHubManagerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.WorkDriver, "work-driver",
o.WorkDriver, "The type of work driver, currently it can be kube, mqtt or grpc")
fs.StringVar(&o.WorkDriverConfig, "work-driver-config",
o.WorkDriverConfig, "The config file path of current work driver")
fs.StringVar(&o.CloudEventsClientID, "cloudevents-client-id",
o.CloudEventsClientID, "The ID of the cloudevents client when publishing works with cloudevents")
}
26 changes: 14 additions & 12 deletions pkg/work/spoke/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@ import (
)

const (
KubeDriver = "kube"
MQTTDriver = "mqtt"
manifestBundleCodecName = "manifestbundle"
manifestCodecName = "manifest"
)

type WorkloadSourceDriver struct {
Type string
Config string
}

// WorkloadAgentOptions defines the flags for workload agent
type WorkloadAgentOptions struct {
StatusSyncInterval time.Duration
AppliedManifestWorkEvictionGracePeriod time.Duration
WorkloadSourceDriver WorkloadSourceDriver
MaxJSONRawLength int32
WorkloadSourceDriver string
WorkloadSourceConfig string
CloudEventsClientID string
CloudEventsClientCodecs []string
}

// NewWorkloadAgentOptions returns the flags with default value set
Expand All @@ -41,8 +39,12 @@ func (o *WorkloadAgentOptions) AddFlags(fs *pflag.FlagSet) {
o.StatusSyncInterval, "Interval to sync resource status to hub.")
fs.DurationVar(&o.AppliedManifestWorkEvictionGracePeriod, "appliedmanifestwork-eviction-grace-period",
o.AppliedManifestWorkEvictionGracePeriod, "Grace period for appliedmanifestwork eviction")
fs.StringVar(&o.WorkloadSourceDriver.Type, "workload-source-driver",
o.WorkloadSourceDriver.Type, "The type of workload source driver, currently it can be kube or mqtt")
fs.StringVar(&o.WorkloadSourceDriver.Config, "workload-source-config",
o.WorkloadSourceDriver.Config, "The config file path of current workload source")
fs.StringVar(&o.WorkloadSourceDriver, "workload-source-driver",
o.WorkloadSourceDriver, "The type of workload source driver, currently it can be kube, mqtt or grpc")
fs.StringVar(&o.WorkloadSourceConfig, "workload-source-config",
o.WorkloadSourceConfig, "The config file path of current workload source")
fs.StringVar(&o.CloudEventsClientID, "cloudevents-client-id",
o.CloudEventsClientID, "The ID of the cloudevents client when workload source source is based on cloudevents")
fs.StringSliceVar(&o.CloudEventsClientCodecs, "cloudevents-client-codecs", o.CloudEventsClientCodecs,
"The codecs for cloudevents client when workload source source is based on cloudevents, the valid codecs: manifest or manifestbundle")
}
Loading

0 comments on commit b0278e4

Please sign in to comment.