Skip to content

Commit

Permalink
support cloudevents for manifestworkreplicaset
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Feb 29, 2024
1 parent 1c3cb03 commit be2e698
Show file tree
Hide file tree
Showing 55 changed files with 3,831 additions and 214 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/net v0.19.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.29.1
k8s.io/apiextensions-apiserver v0.29.0
k8s.io/apimachinery v0.29.1
Expand All @@ -35,7 +36,7 @@ require (
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
open-cluster-management.io/addon-framework v0.8.1-0.20240205013730-13fbb6259464
open-cluster-management.io/api v0.13.0
open-cluster-management.io/sdk-go v0.13.0
open-cluster-management.io/sdk-go v0.13.1-0.20240229071905-c56dc0fe85fa
sigs.k8s.io/controller-runtime v0.16.2
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96
)
Expand Down Expand Up @@ -142,7 +143,6 @@ require (
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
helm.sh/helm/v3 v3.11.1 // indirect
k8s.io/kms v0.29.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ open-cluster-management.io/addon-framework v0.8.1-0.20240205013730-13fbb6259464
open-cluster-management.io/addon-framework v0.8.1-0.20240205013730-13fbb6259464/go.mod h1:SBs6wF0Umzr5/miJb9p8uMaTDbcjphHHQLa76nXnbU8=
open-cluster-management.io/api v0.13.0 h1:dlcJEZlNlE0DmSDctK2s7iWKg9l+Tgb0V78Z040nMuk=
open-cluster-management.io/api v0.13.0/go.mod h1:CuCPEzXDvOyxBB0H1d1eSeajbHqaeGEKq9c63vQc63w=
open-cluster-management.io/sdk-go v0.13.0 h1:ddMGsPUekQr9z03tVN6vF39Uf+WEKMtGU/xSd81HdoA=
open-cluster-management.io/sdk-go v0.13.0/go.mod h1:UnsjzYOrDTF9a8rHEXksoIAtAdO1o5CD5Jtaw6T5B9w=
open-cluster-management.io/sdk-go v0.13.1-0.20240229071905-c56dc0fe85fa h1:sKW0i/6GA1OxoZ/VoxDCRRTrKj3BmS3tLCTk9rCAMpo=
open-cluster-management.io/sdk-go v0.13.1-0.20240229071905-c56dc0fe85fa/go.mod h1:UnsjzYOrDTF9a8rHEXksoIAtAdO1o5CD5Jtaw6T5B9w=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y=
sigs.k8s.io/controller-runtime v0.16.2 h1:mwXAVuEk3EQf478PQwQ48zGOXvW27UJc8NHktQVuIPU=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ spec:
args:
- "/work"
- "manager"
- "--work-driver=kube"
{{ if .HostedMode }}
- "--kubeconfig=/var/run/secrets/hub/kubeconfig"
{{ end }}
Expand Down
11 changes: 8 additions & 3 deletions pkg/cmd/hub/work.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ import (

// NewHubManager generates a command to start hub manager
func NewWorkController() *cobra.Command {
opts := commonoptions.NewOptions()
cmdConfig := opts.
NewControllerCommandConfig("work-manager", version.Get(), hub.RunWorkHubManager)
commonOpts := commonoptions.NewOptions()
hubOpts := hub.NewWorkHubManagerOptions()
hubCfg := hub.NewWorkHubManagerConfig(hubOpts)
cmdConfig := commonOpts.NewControllerCommandConfig("work-manager", version.Get(), hubCfg.RunWorkHubManager)
cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd.Use = "manager"
cmd.Short = "Start the Work Hub Manager"

flags := cmd.Flags()
commonOpts.AddFlags(flags)
hubOpts.AddFlags(flags)

return cmd
}
18 changes: 18 additions & 0 deletions pkg/common/options/worksource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package options

const (
KubeDriver = "kube"
MQTTDriver = "mqtt"
GRPCDriver = "grpc"
)

const (
ManifestBundleCodec = "manifestbundle"
ManifestCodec = "manifest"
)

type WorkloadSourceDriver struct {
Type string
Codec string
Config string
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestAddFinalizerTwiceReconcile(t *testing.T) {
workClient: fakeClient,
}

mwrSetTest, _, err := addFinalizerController.reconcile(context.TODO(), mwrSetTest)
_, _, err := addFinalizerController.reconcile(context.TODO(), mwrSetTest)
if err != nil {
t.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ const (
func NewManifestWorkReplicaSetController(
recorder events.Recorder,
workClient workclientset.Interface,
workApplier *workapplier.WorkApplier,
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) factory.Controller {

controller := newController(
workClient, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer)
workClient, workApplier, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer)

err := manifestWorkReplicaSetInformer.Informer().AddIndexers(
cache.Indexers{
Expand Down Expand Up @@ -114,6 +115,7 @@ func NewManifestWorkReplicaSetController(
}

func newController(workClient workclientset.Interface,
workApplier *workapplier.WorkApplier,
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
Expand All @@ -124,11 +126,20 @@ func newController(workClient workclientset.Interface,
manifestWorkReplicaSetIndexer: manifestWorkReplicaSetInformer.Informer().GetIndexer(),

reconcilers: []ManifestWorkReplicaSetReconcile{
&finalizeReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(workClient, manifestWorkInformer.Lister()),
workClient: workClient, manifestWorkLister: manifestWorkInformer.Lister()},
&addFinalizerReconciler{workClient: workClient},
&deployReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(workClient, manifestWorkInformer.Lister()),
manifestWorkLister: manifestWorkInformer.Lister(), placementLister: placementInformer.Lister(), placeDecisionLister: placeDecisionInformer.Lister()},
&finalizeReconciler{
workApplier: workApplier,
workClient: workClient,
manifestWorkLister: manifestWorkInformer.Lister(),
},
&addFinalizerReconciler{
workClient: workClient,
},
&deployReconciler{
workApplier: workApplier,
manifestWorkLister: manifestWorkInformer.Lister(),
placementLister: placementInformer.Lister(),
placeDecisionLister: placeDecisionInformer.Lister(),
},
&statusReconciler{manifestWorkLister: manifestWorkInformer.Lister()},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"

testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
helpertest "open-cluster-management.io/ocm/pkg/work/hub/test"
Expand Down Expand Up @@ -237,6 +238,7 @@ func TestManifestWorkReplicaSetControllerPatchStatus(t *testing.T) {

ctrl := newController(
fakeClient,
workapplier.NewWorkApplierWithTypedClient(fakeClient, workInformers.Work().V1().ManifestWorks().Lister()),
workInformers.Work().V1alpha1().ManifestWorkReplicaSets(),
workInformers.Work().V1().ManifestWorks(),
clusterInformers.Cluster().V1beta1().Placements(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
clustersdkv1alpha1 "open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"

"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/work/helper"
Expand Down Expand Up @@ -271,8 +273,14 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN
ObjectMeta: metav1.ObjectMeta{
Name: mwrSet.Name,
Namespace: clusterNS,
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName},
Labels: map[string]string{
ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName,
},
Annotations: map[string]string{
common.CloudEventsDataTypeAnnotationKey: payload.ManifestBundleEventDataType.String(),
common.CloudEventsGenerationAnnotationKey: fmt.Sprintf("%d", mwrSet.Generation),
},
},
Spec: mwrSet.Spec.ManifestWorkTemplate}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func TestRequeueWithProgressDeadline(t *testing.T) {
placementLister: placementLister,
}

mwrSet, _, err := pmwDeployController.reconcile(context.TODO(), mwrSet)
_, _, err := pmwDeployController.reconcile(context.TODO(), mwrSet)
var rqe helpers.RequeueError
if !errors.As(err, &rqe) {
t.Errorf("expect to get err %t", err)
Expand Down
151 changes: 122 additions & 29 deletions pkg/work/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,166 @@ package hub

import (
"context"
"fmt"
"time"

"github.com/openshift/library-go/pkg/controller/controllercmd"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"

"open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/work/hub/controllers/manifestworkreplicasetcontroller"
)

// RunWorkHubManager starts the controllers on hub.
func RunWorkHubManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
hubWorkClient, err := workclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
const (
sourceID = "mwrsctrl"
sourceClientID = "mwrsctrl-client"
)

// WorkHubManagerConfig holds configuration for work hub manager controller
type WorkHubManagerConfig struct {
workOptions *WorkHubManagerOptions
}

// NewWorkHubManagerConfig returns a WorkHubManagerConfig
func NewWorkHubManagerConfig(opts *WorkHubManagerOptions) *WorkHubManagerConfig {
return &WorkHubManagerConfig{
workOptions: opts,
}
}

// RunWorkHubManager starts the controllers on hub.
func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
hubClusterClient, err := clusterclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

clusterInformerFactory := clusterinformers.NewSharedInformerFactory(hubClusterClient, 30*time.Minute)

// we need a separated filtered manifestwork informers so we only watch the manifestworks that manifestworkreplicaset cares.
// This could reduce a lot of memory consumptions
manifestWorkInformerFactory := workinformers.NewSharedInformerFactoryWithOptions(hubWorkClient, 30*time.Minute, workinformers.WithTweakListOptions(
func(listOptions *metav1.ListOptions) {
selector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: manifestworkreplicasetcontroller.ManifestWorkReplicaSetControllerNameLabelKey,
Operator: metav1.LabelSelectorOpExists,
},
},
}
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
},
))
// build a hub work client for ManifestWorkReplicaSets
replicaSetsClient, err := workclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

// build hub client and informer
clientHolder, err := c.buildHubClientHolder(ctx, controllerContext)
if err != nil {
return err
}

return RunControllerManagerWithInformers(ctx, controllerContext, hubWorkClient, manifestWorkInformerFactory, clusterInformerFactory)
return RunControllerManagerWithInformers(ctx, controllerContext, replicaSetsClient, clientHolder, clusterInformerFactory)
}

func RunControllerManagerWithInformers(
ctx context.Context,
controllerContext *controllercmd.ControllerContext,
hubWorkClient workclientset.Interface,
manifestWorkInformers workinformers.SharedInformerFactory,
replicaSetClient workclientset.Interface,
hubWorkClientHolder *cloudeventswork.ClientHolder,
clusterInformers clusterinformers.SharedInformerFactory,
) error {
workInformerFactory := workinformers.NewSharedInformerFactory(hubWorkClient, 30*time.Minute)
replicaSetInformerFactory := workinformers.NewSharedInformerFactory(replicaSetClient, 30*time.Minute)
hubWorkInformer := hubWorkClientHolder.ManifestWorkInformer()

manifestWorkReplicaSetController := manifestworkreplicasetcontroller.NewManifestWorkReplicaSetController(
controllerContext.EventRecorder,
hubWorkClient,
workInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets(),
manifestWorkInformers.Work().V1().ManifestWorks(),
replicaSetClient,
workapplier.NewWorkApplierWithTypedClient(hubWorkClientHolder.WorkInterface(), hubWorkInformer.Lister()),
replicaSetInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets(),
hubWorkInformer,
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
)

go clusterInformers.Start(ctx.Done())
go workInformerFactory.Start(ctx.Done())
go manifestWorkInformers.Start(ctx.Done())
go replicaSetInformerFactory.Start(ctx.Done())
go manifestWorkReplicaSetController.Run(ctx, 5)

go hubWorkInformer.Informer().Run(ctx.Done())

<-ctx.Done()
return nil
}

func (c *WorkHubManagerConfig) buildHubClientHolder(
ctx context.Context, controllerContext *controllercmd.ControllerContext) (*cloudeventswork.ClientHolder, error) {
switch c.workOptions.WorkloadSourceDriver.Type {
case options.KubeDriver:
var err error
var hubRestConfig *rest.Config
if c.workOptions.WorkloadSourceDriver.Config == "" {
hubRestConfig = controllerContext.KubeConfig
} else {
hubRestConfig, err = clientcmd.BuildConfigFromFlags("", c.workOptions.WorkloadSourceDriver.Config)
if err != nil {
return nil, err
}
}

// we need a separated filtered manifestwork informers so we only watch the manifestworks that manifestworkreplicaset cares.
// This could reduce a lot of memory consumptions
clientHolder, err := cloudeventswork.NewClientHolderBuilder(sourceClientID, hubRestConfig).
WithInformerConfig(30*time.Minute, workinformers.WithTweakListOptions(
func(listOptions *metav1.ListOptions) {
selector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: manifestworkreplicasetcontroller.ManifestWorkReplicaSetControllerNameLabelKey,
Operator: metav1.LabelSelectorOpExists,
},
},
}
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
},
)).
NewClientHolder(ctx)
if err != nil {
return nil, err
}

return clientHolder, nil
case options.MQTTDriver:
mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(c.workOptions.WorkloadSourceDriver.Config)
if err != nil {
return nil, err
}

clientHolder, err := cloudeventswork.NewClientHolderBuilder(sourceClientID, mqttOptions).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
NewClientHolder(ctx)
if err != nil {
return nil, err
}

return clientHolder, nil
case options.GRPCDriver:
grpcOptions, err := grpc.BuildGRPCOptionsFromFlags(c.workOptions.WorkloadSourceDriver.Config)
if err != nil {
return nil, err
}

clientHolder, err := cloudeventswork.NewClientHolderBuilder(sourceClientID, grpcOptions).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
NewClientHolder(ctx)
if err != nil {
return nil, err
}
return clientHolder, nil
}

return nil, fmt.Errorf("unsupported driver %s", c.workOptions.WorkloadSourceDriver.Type)
}
Loading

0 comments on commit be2e698

Please sign in to comment.