Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-0.19] 🐛fix(controller): support WaitForSync in custom TypedSyncingSource #3087

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
c.LogConstructor(nil).Info("Starting Controller")

for _, watch := range c.startWatches {
syncingSource, ok := watch.(source.SyncingSource)
syncingSource, ok := watch.(source.TypedSyncingSource[request])
if !ok {
continue
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

type TestRequest struct {
Key string
}

var _ = Describe("controller", func() {
var fakeReconcile *fakeReconciler
var ctrl *Controller[reconcile.Request]
Expand Down Expand Up @@ -323,6 +327,41 @@ var _ = Describe("controller", func() {
Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
})

It("should check for correct TypedSyncingSource if custom types are used", func() {
queue := &controllertest.TypedQueue[TestRequest]{
TypedInterface: workqueue.NewTyped[TestRequest](),
}
ctrl := &Controller[TestRequest]{
NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] {
return queue
},
LogConstructor: func(*TestRequest) logr.Logger {
return log.RuntimeLog.WithName("controller").WithName("test")
},
}
ctrl.CacheSyncTimeout = time.Second
src := &bisignallingSource[TestRequest]{
startCall: make(chan workqueue.TypedRateLimitingInterface[TestRequest]),
startDone: make(chan error, 1),
waitCall: make(chan struct{}),
waitDone: make(chan error, 1),
}
ctrl.startWatches = []source.TypedSource[TestRequest]{src}
ctrl.Name = "foo"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startCh := make(chan error)
go func() {
defer GinkgoRecover()
startCh <- ctrl.Start(ctx)
}()
Eventually(src.startCall).Should(Receive(Equal(queue)))
src.startDone <- nil
Eventually(src.waitCall).Should(BeClosed())
src.waitDone <- nil
cancel()
Eventually(startCh).Should(Receive(Succeed()))
})
})

Describe("Processing queue items from a Controller", func() {
Expand Down Expand Up @@ -875,3 +914,40 @@ func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Conte
<-ctx.Done()
return nil, errors.New("GetInformer timed out")
}

type bisignallingSource[T comparable] struct {
// receives the queue that is passed to Start
startCall chan workqueue.TypedRateLimitingInterface[T]
// passes an error to return from Start
startDone chan error
// closed when WaitForSync is called
waitCall chan struct{}
// passes an error to return from WaitForSync
waitDone chan error
}

var _ source.TypedSyncingSource[int] = (*bisignallingSource[int])(nil)

func (t *bisignallingSource[T]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[T]) error {
select {
case t.startCall <- q:
case <-ctx.Done():
return ctx.Err()
}
select {
case err := <-t.startDone:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error {
close(t.waitCall)
select {
case err := <-t.waitDone:
return err
case <-ctx.Done():
return ctx.Err()
}
}
Loading