Skip to content

Commit

Permalink
support cloudevents for manifestworkreplicaset
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Feb 29, 2024
1 parent 1c3cb03 commit ddb6a35
Show file tree
Hide file tree
Showing 59 changed files with 3,842 additions and 225 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/net v0.19.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.29.1
k8s.io/apiextensions-apiserver v0.29.0
k8s.io/apimachinery v0.29.1
Expand All @@ -35,7 +36,7 @@ require (
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
open-cluster-management.io/addon-framework v0.8.1-0.20240205013730-13fbb6259464
open-cluster-management.io/api v0.13.0
open-cluster-management.io/sdk-go v0.13.0
open-cluster-management.io/sdk-go v0.13.1-0.20240227052220-ae7814c4d512
sigs.k8s.io/controller-runtime v0.16.2
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96
)
Expand Down Expand Up @@ -142,7 +143,6 @@ require (
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
helm.sh/helm/v3 v3.11.1 // indirect
k8s.io/kms v0.29.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ open-cluster-management.io/addon-framework v0.8.1-0.20240205013730-13fbb6259464
open-cluster-management.io/addon-framework v0.8.1-0.20240205013730-13fbb6259464/go.mod h1:SBs6wF0Umzr5/miJb9p8uMaTDbcjphHHQLa76nXnbU8=
open-cluster-management.io/api v0.13.0 h1:dlcJEZlNlE0DmSDctK2s7iWKg9l+Tgb0V78Z040nMuk=
open-cluster-management.io/api v0.13.0/go.mod h1:CuCPEzXDvOyxBB0H1d1eSeajbHqaeGEKq9c63vQc63w=
open-cluster-management.io/sdk-go v0.13.0 h1:ddMGsPUekQr9z03tVN6vF39Uf+WEKMtGU/xSd81HdoA=
open-cluster-management.io/sdk-go v0.13.0/go.mod h1:UnsjzYOrDTF9a8rHEXksoIAtAdO1o5CD5Jtaw6T5B9w=
open-cluster-management.io/sdk-go v0.13.1-0.20240227052220-ae7814c4d512 h1:Fb9laGmR+R2DGYV8k2FMx+IhQQx28904aUeYPN8ssMM=
open-cluster-management.io/sdk-go v0.13.1-0.20240227052220-ae7814c4d512/go.mod h1:UnsjzYOrDTF9a8rHEXksoIAtAdO1o5CD5Jtaw6T5B9w=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.16.2 h1:mwXAVuEk3EQf478PQwQ48zGOXvW27UJc8NHktQVuIPU=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ spec:
args:
- "/work"
- "manager"
- "--work-driver=kube"
{{ if .HostedMode }}
- "--kubeconfig=/var/run/secrets/hub/kubeconfig"
{{ end }}
Expand Down
11 changes: 8 additions & 3 deletions pkg/cmd/hub/work.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ import (

// NewHubManager generates a command to start hub manager
func NewWorkController() *cobra.Command {
opts := commonoptions.NewOptions()
cmdConfig := opts.
NewControllerCommandConfig("work-manager", version.Get(), hub.RunWorkHubManager)
commonOpts := commonoptions.NewOptions()
hubOpts := hub.NewWorkHubManagerOptions()
hubCfg := hub.NewWorkHubManagerConfig(hubOpts)
cmdConfig := commonOpts.NewControllerCommandConfig("work-manager", version.Get(), hubCfg.RunWorkHubManager)
cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd.Use = "manager"
cmd.Short = "Start the Work Hub Manager"

flags := cmd.Flags()
commonOpts.AddFlags(flags)
hubOpts.AddFlags(flags)

return cmd
}
18 changes: 18 additions & 0 deletions pkg/common/options/worksource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package options

const (
KubeDriver = "kube"
MQTTDriver = "mqtt"
GRPCDriver = "grpc"
)

const (
ManifestBundleCodec = "manifestbundle"
ManifestCodec = "manifest"
)

type WorkloadSourceDriver struct {
Type string
Codec string
Config string
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// addFinalizerReconciler is to add finalizer to the manifestworkreplicaset.
type addFinalizerReconciler struct {
workClient workclientset.Interface
replicaSetClient workclientset.Interface
}

func (a *addFinalizerReconciler) reconcile(ctx context.Context, pw *workapiv1alpha1.ManifestWorkReplicaSet,
Expand All @@ -22,7 +22,7 @@ func (a *addFinalizerReconciler) reconcile(ctx context.Context, pw *workapiv1alp

workSetPatcher := patcher.NewPatcher[
*workapiv1alpha1.ManifestWorkReplicaSet, workapiv1alpha1.ManifestWorkReplicaSetSpec, workapiv1alpha1.ManifestWorkReplicaSetStatus](
a.workClient.WorkV1alpha1().ManifestWorkReplicaSets(pw.Namespace))
a.replicaSetClient.WorkV1alpha1().ManifestWorkReplicaSets(pw.Namespace))

updated, err := workSetPatcher.AddFinalizer(ctx, pw, ManifestWorkReplicaSetFinalizer)
// if this conflicts, we'll simply try again later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestAddFinalizerReconcile(t *testing.T) {
fakeClient := fakeclient.NewSimpleClientset(mwrSetTest)

addFinalizerController := &addFinalizerReconciler{
workClient: fakeClient,
replicaSetClient: fakeClient,
}

_, _, err := addFinalizerController.reconcile(context.TODO(), mwrSetTest)
Expand Down Expand Up @@ -58,10 +58,10 @@ func TestAddFinalizerTwiceReconcile(t *testing.T) {
fakeClient := fakeclient.NewSimpleClientset(mwrSetTest)

addFinalizerController := &addFinalizerReconciler{
workClient: fakeClient,
replicaSetClient: fakeClient,
}

mwrSetTest, _, err := addFinalizerController.reconcile(context.TODO(), mwrSetTest)
_, _, err := addFinalizerController.reconcile(context.TODO(), mwrSetTest)
if err != nil {
t.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (
)

type ManifestWorkReplicaSetController struct {
workClient workclientset.Interface
replicaSetClient workclientset.Interface
manifestWorkReplicaSetLister worklisterv1alpha1.ManifestWorkReplicaSetLister
manifestWorkReplicaSetIndexer cache.Indexer

Expand All @@ -75,14 +75,15 @@ const (

func NewManifestWorkReplicaSetController(
recorder events.Recorder,
workClient workclientset.Interface,
replicaSetClient workclientset.Interface,
workApplier *workapplier.WorkApplier,
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) factory.Controller {

controller := newController(
workClient, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer)
replicaSetClient, workApplier, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer)

err := manifestWorkReplicaSetInformer.Informer().AddIndexers(
cache.Indexers{
Expand Down Expand Up @@ -113,22 +114,32 @@ func NewManifestWorkReplicaSetController(
WithSync(controller.sync).ToController("ManifestWorkReplicaSetController", recorder)
}

func newController(workClient workclientset.Interface,
func newController(replicaSetClient workclientset.Interface,
workApplier *workapplier.WorkApplier,
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) *ManifestWorkReplicaSetController {
return &ManifestWorkReplicaSetController{
workClient: workClient,
replicaSetClient: replicaSetClient,
manifestWorkReplicaSetLister: manifestWorkReplicaSetInformer.Lister(),
manifestWorkReplicaSetIndexer: manifestWorkReplicaSetInformer.Informer().GetIndexer(),

reconcilers: []ManifestWorkReplicaSetReconcile{
&finalizeReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(workClient, manifestWorkInformer.Lister()),
workClient: workClient, manifestWorkLister: manifestWorkInformer.Lister()},
&addFinalizerReconciler{workClient: workClient},
&deployReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(workClient, manifestWorkInformer.Lister()),
manifestWorkLister: manifestWorkInformer.Lister(), placementLister: placementInformer.Lister(), placeDecisionLister: placeDecisionInformer.Lister()},
&finalizeReconciler{
workApplier: workApplier,
replicaSetClient: replicaSetClient,
manifestWorkLister: manifestWorkInformer.Lister(),
},
&addFinalizerReconciler{
replicaSetClient: replicaSetClient,
},
&deployReconciler{
workApplier: workApplier,
manifestWorkLister: manifestWorkInformer.Lister(),
placementLister: placementInformer.Lister(),
placeDecisionLister: placeDecisionInformer.Lister(),
},
&statusReconciler{manifestWorkLister: manifestWorkInformer.Lister()},
},
}
Expand Down Expand Up @@ -176,7 +187,7 @@ func (m *ManifestWorkReplicaSetController) sync(ctx context.Context, controllerC

workSetPatcher := patcher.NewPatcher[
*workapiv1alpha1.ManifestWorkReplicaSet, workapiv1alpha1.ManifestWorkReplicaSetSpec, workapiv1alpha1.ManifestWorkReplicaSetStatus](
m.workClient.WorkV1alpha1().ManifestWorkReplicaSets(namespace))
m.replicaSetClient.WorkV1alpha1().ManifestWorkReplicaSets(namespace))

// Patch status
if _, err := workSetPatcher.PatchStatus(ctx, manifestWorkReplicaSet, manifestWorkReplicaSet.Status, oldManifestWorkReplicaSet.Status); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"

testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
helpertest "open-cluster-management.io/ocm/pkg/work/hub/test"
Expand Down Expand Up @@ -237,6 +238,7 @@ func TestManifestWorkReplicaSetControllerPatchStatus(t *testing.T) {

ctrl := newController(
fakeClient,
workapplier.NewWorkApplierWithTypedClient(fakeClient, workInformers.Work().V1().ManifestWorks().Lister()),
workInformers.Work().V1alpha1().ManifestWorkReplicaSets(),
workInformers.Work().V1().ManifestWorks(),
clusterInformers.Cluster().V1beta1().Placements(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
clustersdkv1alpha1 "open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"

"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/work/helper"
Expand Down Expand Up @@ -271,8 +273,14 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN
ObjectMeta: metav1.ObjectMeta{
Name: mwrSet.Name,
Namespace: clusterNS,
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName},
Labels: map[string]string{
ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName,
},
Annotations: map[string]string{
common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(),
common.CloudEventsGenerationAnnotationKey: fmt.Sprintf("%d", mwrSet.Generation),
},
},
Spec: mwrSet.Spec.ManifestWorkTemplate}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func TestRequeueWithProgressDeadline(t *testing.T) {
placementLister: placementLister,
}

mwrSet, _, err := pmwDeployController.reconcile(context.TODO(), mwrSet)
_, _, err := pmwDeployController.reconcile(context.TODO(), mwrSet)
var rqe helpers.RequeueError
if !errors.As(err, &rqe) {
t.Errorf("expect to get err %t", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// finalizeReconciler is to finalize the manifestWorkReplicaSet by deleting all related manifestWorks.
type finalizeReconciler struct {
workApplier *workapplier.WorkApplier
workClient workclientset.Interface
replicaSetClient workclientset.Interface
manifestWorkLister worklisterv1.ManifestWorkLister
}

Expand All @@ -32,7 +32,7 @@ func (f *finalizeReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alp

workSetPatcher := patcher.NewPatcher[
*workapiv1alpha1.ManifestWorkReplicaSet, workapiv1alpha1.ManifestWorkReplicaSetSpec, workapiv1alpha1.ManifestWorkReplicaSetStatus](
f.workClient.WorkV1alpha1().ManifestWorkReplicaSets(mwrSet.Namespace))
f.replicaSetClient.WorkV1alpha1().ManifestWorkReplicaSets(mwrSet.Namespace))

// Remove finalizer after delete all created Manifestworks
if err := workSetPatcher.RemoveFinalizer(ctx, mwrSet, ManifestWorkReplicaSetFinalizer); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestFinalizeReconcile(t *testing.T) {
mwLister := manifestWorkInformerFactory.Work().V1().ManifestWorks().Lister()

finalizerController := finalizeReconciler{
workClient: fakeClient,
replicaSetClient: fakeClient,
manifestWorkLister: mwLister,
workApplier: workapplier.NewWorkApplierWithTypedClient(fakeClient, mwLister),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ func TestPlaceMWControllerIndex(t *testing.T) {
placementDecisionLister := clusterInformerFactory.Cluster().V1beta1().PlacementDecisions().Lister()

pmwController := &ManifestWorkReplicaSetController{
workClient: fWorkClient,
replicaSetClient: fWorkClient,
manifestWorkReplicaSetLister: workInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets().Lister(),
manifestWorkReplicaSetIndexer: workInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets().Informer().GetIndexer(),

reconcilers: []ManifestWorkReplicaSetReconcile{
&finalizeReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(fWorkClient, mwLister),
workClient: fWorkClient, manifestWorkLister: mwLister},
&addFinalizerReconciler{workClient: fWorkClient},
replicaSetClient: fWorkClient, manifestWorkLister: mwLister},
&addFinalizerReconciler{replicaSetClient: fWorkClient},
&deployReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(fWorkClient, mwLister),
manifestWorkLister: mwLister, placementLister: placementLister, placeDecisionLister: placementDecisionLister},
&statusReconciler{manifestWorkLister: mwLister},
Expand Down
Loading

0 comments on commit ddb6a35

Please sign in to comment.