Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 Add watch to in-memory server multiplexer #8851

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Cache interface {
// Informer forwards events to event handlers.
type Informer interface {
AddEventHandler(handler InformEventHandler) error
RemoveEventHandler(handler InformEventHandler) error
}

// InformEventHandler handle events originated by a source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,8 @@ func (c *cache) doTryDeleteLocked(resourceGroup string, tracker *resourceGroupTr
delete(tracker.ownedObjects, ownReference{gvk: objGVK, key: objKey})
}

// If the object still has finalizers, only set the deletion timestamp if not already set.
if len(obj.GetFinalizers()) > 0 {
if !obj.GetDeletionTimestamp().IsZero() {
return false, nil
}
// Set the deletion timestamp if not already set.
if obj.GetDeletionTimestamp().IsZero() {
killianmuldoon marked this conversation as resolved.
Show resolved Hide resolved
if err := c.beforeDelete(resourceGroup, obj); err != nil {
return false, apierrors.NewBadRequest(err.Error())
}
Expand All @@ -424,13 +421,18 @@ func (c *cache) doTryDeleteLocked(resourceGroup string, tracker *resourceGroupTr
if err := c.beforeUpdate(resourceGroup, oldObj, obj); err != nil {
return false, apierrors.NewBadRequest(err.Error())
}

// TODO: (killianmuldoon) Understand if setting this twice is necessary.
killianmuldoon marked this conversation as resolved.
Show resolved Hide resolved
// Required to override default beforeUpdate behaviour
// that prevent changes to automatically managed fields.
obj.SetDeletionTimestamp(&now)

objects[objKey] = obj
c.afterUpdate(resourceGroup, oldObj, obj)
}

// If the object still has finalizers return early.
if len(obj.GetFinalizers()) > 0 {
return false, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func Test_cache_client(t *testing.T) {
g.Expect(c.resourceGroups["foo"].objects).To(HaveKey(cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)), "gvk must exist in object tracker for foo")
g.Expect(c.resourceGroups["foo"].objects[cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)]).ToNot(HaveKey(types.NamespacedName{Name: "bar"}), "Object bar must not exist in object tracker for foo")

g.Expect(h.Events()).ToNot(ContainElement("foo, CloudMachine=bar, Deleted"))
g.Expect(h.Events()).To(ContainElement("foo, CloudMachine=bar, Deleted"))
})

t.Run("delete with finalizers", func(t *testing.T) {
Expand Down Expand Up @@ -760,6 +760,11 @@ func (i *fakeInformer) AddEventHandler(handler InformEventHandler) error {
return nil
}

func (i *fakeInformer) RemoveEventHandler(_ InformEventHandler) error {
i.handler = nil
return nil
}

func (i *fakeInformer) InformCreate(resourceGroup string, obj client.Object) {
i.handler.OnCreate(resourceGroup, obj)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ func (i *informer) AddEventHandler(handler InformEventHandler) error {
return nil
}

func (i *informer) RemoveEventHandler(handler InformEventHandler) error {
i.lock.Lock()
defer i.lock.Unlock()
for j, h := range i.handlers {
if h == handler {
i.handlers = append(i.handlers[:j], i.handlers[j+1:]...)
}
}
return nil
}

func (c *cache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ func (i *fakeInformer) AddEventHandler(handler ccache.InformEventHandler) error
return nil
}

func (i *fakeInformer) RemoveEventHandler(_ ccache.InformEventHandler) error {
i.handler = nil
return nil
}

func (i *fakeInformer) InformCreate(resourceGroup string, obj client.Object) {
i.handler.OnCreate(resourceGroup, obj)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func TestReconcileNormalEtcd(t *testing.T) {
// NOTE: make sure to use ports different than other tests, so we can run tests in parallel
MinPort: server.DefaultMinPort + 1000,
MaxPort: server.DefaultMinPort + 1099,
killianmuldoon marked this conversation as resolved.
Show resolved Hide resolved
DebugPort: server.DefaultDebugPort,
DebugPort: server.DefaultDebugPort + 10,
})
g.Expect(err).ToNot(HaveOccurred())
_, err = wcmux.InitWorkloadClusterListener(klog.KObj(cluster).String())
Expand Down Expand Up @@ -445,8 +445,8 @@ func TestReconcileNormalApiServer(t *testing.T) {
wcmux, err := server.NewWorkloadClustersMux(manager, host, server.CustomPorts{
// NOTE: make sure to use ports different than other tests, so we can run tests in parallel
MinPort: server.DefaultMinPort + 1100,
MaxPort: server.DefaultMinPort + 1299,
DebugPort: server.DefaultDebugPort,
MaxPort: server.DefaultMinPort + 1199,
DebugPort: server.DefaultDebugPort + 11,
})
g.Expect(err).ToNot(HaveOccurred())
_, err = wcmux.InitWorkloadClusterListener(klog.KObj(cluster).String())
Expand Down
10 changes: 10 additions & 0 deletions test/infrastructure/inmemory/internal/server/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
return
}

// If the request is a Watch handle it using watchForResource.
if isWatch(req) {
err = h.watchForResource(req, resp, resourceGroup, *gvk)
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
return
}

// Reads and returns the requested data.
list := &unstructured.UnstructuredList{}
list.SetAPIVersion(gvk.GroupVersion().String())
Expand Down
191 changes: 191 additions & 0 deletions test/infrastructure/inmemory/internal/server/api/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
"context"
"fmt"
"net/http"
"time"

"github.com/emicklei/go-restful/v3"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// Event records a lifecycle event for a Kubernetes object.
type Event struct {
Type watch.EventType `json:"type,omitempty"`
Object runtime.Object `json:"object,omitempty"`
}

// WatchEventDispatcher dispatches events for a single resourceGroup.
type WatchEventDispatcher struct {
resourceGroup string
events chan *Event
}

// OnCreate dispatches Create events.
func (m *WatchEventDispatcher) OnCreate(resourceGroup string, o client.Object) {
if resourceGroup != m.resourceGroup {
return
}
m.events <- &Event{
Type: watch.Added,
Object: o,
}
}

// OnUpdate dispatches Update events.
func (m *WatchEventDispatcher) OnUpdate(resourceGroup string, _, o client.Object) {
if resourceGroup != m.resourceGroup {
return
}
m.events <- &Event{
Type: watch.Modified,
Object: o,
}
}

// OnDelete dispatches Delete events.
func (m *WatchEventDispatcher) OnDelete(resourceGroup string, o client.Object) {
if resourceGroup != m.resourceGroup {
return
}
m.events <- &Event{
Type: watch.Deleted,
Object: o,
}
}

// OnGeneric dispatches Generic events.
func (m *WatchEventDispatcher) OnGeneric(resourceGroup string, o client.Object) {
if resourceGroup != m.resourceGroup {
return
}
m.events <- &Event{
Type: "GENERIC",
Object: o,
}
}

// isWatch is true if the request contains `watch="true"` as a query parameter.
func isWatch(req *restful.Request) bool {
killianmuldoon marked this conversation as resolved.
Show resolved Hide resolved
return req.QueryParameter("watch") == "true"
}

func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.Response, resourceGroup string, gvk schema.GroupVersionKind) (reterr error) {
ctx := req.Request.Context()
queryTimeout := req.QueryParameter("timeoutSeconds")
killianmuldoon marked this conversation as resolved.
Show resolved Hide resolved
c := h.manager.GetCache()
i, err := c.GetInformerForKind(ctx, gvk)
if err != nil {
return err
}
h.log.Info(fmt.Sprintf("Serving Watch for %v", req.Request.URL))
// With an unbuffered event channel RemoveEventHandler could be blocked because it requires a lock on the informer.
// When Run stops reading from the channel the informer could be blocked with an unbuffered chanel and then RemoveEventHandler never goes through.
events := make(chan *Event, 10)
watcher := &WatchEventDispatcher{
resourceGroup: resourceGroup,
events: events,
}

if err := i.AddEventHandler(watcher); err != nil {
return err
}

// Defer cleanup which removes the event handler and ensures the channel is empty of events.
defer func() {
reterr = i.RemoveEventHandler(watcher)
// Doing this to ensure the channel is empty.
L:
for {
select {
case <-events:
default:
break L
}
}
}()

if err = watcher.Run(ctx, queryTimeout, resp); err != nil {
return err
}
return reterr
killianmuldoon marked this conversation as resolved.
Show resolved Hide resolved
}

// Run serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.ResponseWriter) error {
flusher, ok := w.(http.Flusher)
if !ok {
return errors.New("can't start Watch: can't get http.Flusher")
}
resp, ok := w.(*restful.Response)
if !ok {
return errors.New("can't start Watch: can't get restful.Response")
}
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()

timeoutTimer, seconds, err := setTimer(timeout)
if err != nil {
return errors.Wrapf(err, "can't start Watch: could not set timeout")
}

ctx, cancel := context.WithTimeout(ctx, seconds)
defer cancel()
defer timeoutTimer.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-timeoutTimer.C:
return nil
case event, ok := <-m.events:
if !ok {
// End of results.
return nil
}
if err := resp.WriteEntity(event); err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
}
if len(m.events) == 0 {
flusher.Flush()
}
}
}
}

// setTimer creates a time.Timer with the passed `timeout` or a default timeout of 120 seconds if `timeout` is empty.
func setTimer(timeout string) (*time.Timer, time.Duration, error) {
var defaultTimeout = 120 * time.Second
if timeout == "" {
t := time.NewTimer(defaultTimeout)
return t, defaultTimeout, nil
}
seconds, err := time.ParseDuration(fmt.Sprintf("%ss", timeout))
if err != nil {
return nil, 0, errors.Wrapf(err, "Could not parse request timeout %s", timeout)
}
t := time.NewTimer(seconds)
return t, seconds, nil
}
4 changes: 2 additions & 2 deletions test/infrastructure/inmemory/internal/server/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *WorkloadClusterListener) RESTConfig() (*rest.Config, error) {
}

// GetClient returns a client for a WorkloadClusterListener.
func (s *WorkloadClusterListener) GetClient() (client.Client, error) {
func (s *WorkloadClusterListener) GetClient() (client.WithWatch, error) {
restConfig, err := s.RESTConfig()
if err != nil {
return nil, err
Expand All @@ -130,7 +130,7 @@ func (s *WorkloadClusterListener) GetClient() (client.Client, error) {
return nil, err
}

c, err := client.New(restConfig, client.Options{Scheme: s.scheme, Mapper: mapper})
c, err := client.NewWithWatch(restConfig, client.Options{Scheme: s.scheme, Mapper: mapper})
if err != nil {
return nil, err
}
Expand Down
Loading
Loading