Skip to content

Commit

Permalink
Add some comments. Started SharedStrategyController differently
Browse files Browse the repository at this point in the history
  • Loading branch information
Eikykun committed Apr 28, 2024
1 parent a367b89 commit f5326a0
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 69 deletions.
9 changes: 8 additions & 1 deletion pkg/controllers/collaset/collaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ func AddToMgr(mgr ctrl.Manager, r reconcile.Reconciler) error {
if err != nil {
return err
}

// Only for starting SharedStrategyController
err = c.Watch(strategy.SharedStrategyController, &handler.Funcs{})
if err != nil {
return err
}

ch := make(chan event.GenericEvent, 1<<10)
strategy.SharedStrategyController.RegisterGenericEventChannel(ch)
// Watch PodDecoration related events
Expand Down Expand Up @@ -177,7 +184,7 @@ func (r *CollaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
UpdatedRevision: updatedRevision.Name,
}

getter, err := utilspoddecoration.NewPodDecorationGetter(ctx, r.Client, instance.Namespace)
getter, err := utilspoddecoration.NewPodDecorationGetter(r.Client, instance.Namespace)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/controllers/poddecoration/poddecoration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Add PodDecoration strategy manager runnable
if err := addRunner(mgr); err != nil {
return err
}
// Create a new controller
c, err := controller.New("poddecoration-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
Expand Down
27 changes: 0 additions & 27 deletions pkg/controllers/poddecoration/runner.go

This file was deleted.

11 changes: 6 additions & 5 deletions pkg/controllers/utils/poddecoration/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ type namespacedPodDecorationManager struct {
revisions map[string]*appsv1alpha1.PodDecoration
}

func NewPodDecorationGetter(ctx context.Context, c client.Client, namespace string) (Getter, error) {
// Wait for PodDecoration strategy manager runnable started
if !strategy.SharedStrategyController.WaitForSync(ctx) {
return nil, fmt.Errorf("PodDecorationGetter WaitForCacheSync did not successfully complete")
}
func NewPodDecorationGetter(c client.Client, namespace string) (Getter, error) {
getter := &namespacedPodDecorationManager{
c: c,
controller: strategy.SharedStrategyController,
Expand Down Expand Up @@ -116,6 +112,11 @@ func (n *namespacedPodDecorationManager) GetEffective(ctx context.Context, pod *
return n.GetByRevisions(ctx, append(updatedRevisions.List(), stableRevisions.List()...)...)
}

// getEffectiveRevisions returns the revisions of the PodDecorations that are currently in effect for the pod.
// If the Pod is selected by the spec.updateStrategy, they are placed in the updatedRevisions, representing the
// versions that are being updated. If not selected by UpdateStrategy, it falls back to using CurrentRevision
// and Pod’s OldRevision, and adds it to stableRevisions. If OldRevision and CurrentRevision are from the same
// PodDecoration, the OldRevision is preferred.
func (n *namespacedPodDecorationManager) getEffectiveRevisions(pod *corev1.Pod, oldRevMap map[string]string) (updatedRevisions, stableRevisions sets.String) {
// oldRevMap, PDName: revision
// updateRevMap, PDName: revision
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/utils/poddecoration/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var _ = Describe("Test PodDecoration getter", func() {
podDecoration.Status.UpdatedRevision = updatedRevision
Expect(strategy.SharedStrategyController.UpdateSelectedPods(ctx, podDecoration, nil)).Should(BeNil())
strategy.SharedStrategyController.Synced()
getter, err := NewPodDecorationGetter(ctx, c, testcase)
getter, err := NewPodDecorationGetter(c, testcase)
tu := true
po0 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down
62 changes: 35 additions & 27 deletions pkg/controllers/utils/poddecoration/strategy/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ import (
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source"

appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1"
"kusionstack.io/operating/pkg/controllers/utils"
Expand All @@ -47,15 +50,12 @@ const (

var SharedStrategyController Controller

var _ manager.Runnable = &strategyManager{}
var _ inject.Client = &strategyManager{}

type Controller interface {
Updater
Reader

// Start will load the PodDecoration resources in controller cache. No blocking.
// It will stop running when the context is closed.
Start(context.Context) error
source.SyncingSource
// RegisterGenericEventChannel registers a channel to listen for changes associated with the CollaSet.
RegisterGenericEventChannel(chan<- event.GenericEvent)
// InjectClient inject manager client into Controller
Expand All @@ -73,8 +73,6 @@ type Updater interface {
}

type Reader interface {
// WaitForSync waits for all PodDecoration managers cache were synced.
WaitForSync(ctx context.Context) bool
// LatestPodDecorations are a set of the most recent PodDecorations in the namespace.
LatestPodDecorations(namespace string) []*appsv1alpha1.PodDecoration
// EffectivePodRevisions is used to select the suitable version from the UpdatedRevision
Expand All @@ -88,27 +86,50 @@ func init() {
}
}

type strategyManager struct {
client.Client
// PDNamespace:PDName:Manager
managers map[string]map[string]*podDecorationManager
listeners []chan<- event.GenericEvent
synced bool
mu sync.RWMutex
}

func (m *strategyManager) Start(ctx context.Context, h handler.EventHandler, q workqueue.RateLimitingInterface, p ...predicate.Predicate) error {
return m.start(ctx)
}

func (m *strategyManager) RegisterGenericEventChannel(ch chan<- event.GenericEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.listeners = append(m.listeners, ch)
}

func (m *strategyManager) Start(ctx context.Context) error {
// start load the PodDecoration resources in controller cache. No blocking.
func (m *strategyManager) start(ctx context.Context) error {
if m.HasSynced() {
return nil
}
if err := m.syncAllPodDecorations(ctx); err != nil {
return err
}
m.Synced()
return nil
}

func (m *strategyManager) syncAllPodDecorations(ctx context.Context) error {
allPodDecorations := &appsv1alpha1.PodDecorationList{}
if err := m.List(ctx, allPodDecorations); err != nil {
return err
}
q := workqueue.New()

for i := range allPodDecorations.Items {
pd := &allPodDecorations.Items[i]
if pd.DeletionTimestamp != nil {
continue
}
q.Add(types.NamespacedName{Namespace: pd.Namespace, Name: pd.Name})
}
defer m.Synced()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -154,15 +175,6 @@ func (m *strategyManager) Start(ctx context.Context) error {
return nil
}

type strategyManager struct {
client.Client
// PDNamespace:PDName:Manager
managers map[string]map[string]*podDecorationManager
listeners []chan<- event.GenericEvent
synced bool
mu sync.RWMutex
}

func (m *strategyManager) HasSynced() bool {
m.mu.RLock()
defer m.mu.RUnlock()
Expand All @@ -180,16 +192,12 @@ func (m *strategyManager) InjectClient(c client.Client) error {
return nil
}

func (m *strategyManager) WaitForSync(ctx context.Context) bool {
err := wait.PollImmediateUntilWithContext(ctx, syncedPollPeriod,
// WaitForSync waits for all PodDecoration managers cache were synced.
func (m *strategyManager) WaitForSync(ctx context.Context) error {
return wait.PollImmediateUntilWithContext(ctx, syncedPollPeriod,
func(context.Context) (bool, error) {
return m.HasSynced(), nil
})
if err != nil {
klog.V(2).Infof("stop requested")
return false
}
return true
}

func (m *strategyManager) LatestPodDecorations(namespace string) (pds []*appsv1alpha1.PodDecoration) {
Expand Down
10 changes: 6 additions & 4 deletions pkg/controllers/utils/poddecoration/strategy/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ var _ = Describe("Test PodDecoration strategy manager", func() {
podDecoration.Status.CurrentRevision = "1"
podDecoration.Status.UpdatedRevision = "2"
Expect(c.Status().Update(ctx, podDecoration)).Should(BeNil())
Expect(SharedStrategyController.Start(ctx)).NotTo(HaveOccurred())
Expect(SharedStrategyController.WaitForSync(ctx)).Should(Equal(true))
SharedStrategyController.(*strategyManager).synced = false
Expect(SharedStrategyController.Start(ctx, nil, nil)).NotTo(HaveOccurred())
Expect(SharedStrategyController.WaitForSync(ctx)).Should(BeNil())
Expect(len(SharedStrategyController.LatestPodDecorations(testcase))).Should(Equal(1))
updatedRevisions, stableRevisions := SharedStrategyController.EffectivePodRevisions(pods[0])
Expect(len(updatedRevisions)).Should(Equal(0))
Expand Down Expand Up @@ -248,8 +249,9 @@ var _ = Describe("Test PodDecoration strategy manager", func() {
podDecoration.Status.CurrentRevision = "1"
podDecoration.Status.UpdatedRevision = "2"
Expect(c.Status().Update(ctx, podDecoration)).Should(BeNil())
Expect(SharedStrategyController.Start(ctx)).NotTo(HaveOccurred())
Expect(SharedStrategyController.WaitForSync(ctx)).Should(Equal(true))
SharedStrategyController.(*strategyManager).synced = false
Expect(SharedStrategyController.Start(ctx, nil, nil)).NotTo(HaveOccurred())
Expect(SharedStrategyController.WaitForSync(ctx)).Should(BeNil())
Expect(len(SharedStrategyController.LatestPodDecorations(testcase))).Should(Equal(1))
// order: [pod-1], [pod-2, pod-3]
updatedRevisions, stableRevisions := SharedStrategyController.EffectivePodRevisions(pods[0])
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/utils/poddecoration/strategy/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ func (p *sortedPodInfo) Less(i, j int) bool {
return imatch
}
if p.infos[i].revision != p.infos[j].revision {
// Move the latest version to the front,
// ensuring that the Pod selected by the partition is always ahead
return p.infos[i].revision > p.infos[j].revision
}
// TODO: more sort method
// Default sort by pod instance id in ResourceContext
return p.infos[i].instanceId < p.infos[j].instanceId
}

Expand Down

0 comments on commit f5326a0

Please sign in to comment.