Skip to content

Commit

Permalink
Add watch to in-memory server multiplexer
Browse files Browse the repository at this point in the history
Signed-off-by: killianmuldoon <kmuldoon@vmware.com>
  • Loading branch information
killianmuldoon committed Jun 13, 2023
1 parent 238ca15 commit a32a0c0
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 8 deletions.
4 changes: 3 additions & 1 deletion test/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ replace sigs.k8s.io/cluster-api => ../

require (
github.com/blang/semver v3.5.1+incompatible
github.com/davecgh/go-spew v1.1.1
github.com/docker/docker v20.10.24+incompatible
github.com/docker/go-connections v0.4.0
github.com/emicklei/go-restful/v3 v3.9.0
Expand All @@ -16,6 +17,7 @@ require (
github.com/onsi/gomega v1.27.8
github.com/pkg/errors v0.9.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.3
github.com/vincent-petithory/dataurl v1.0.0
go.etcd.io/etcd/api/v3 v3.5.9
go.etcd.io/etcd/client/v3 v3.5.9
Expand Down Expand Up @@ -53,7 +55,6 @@ require (
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/go-systemd/v22 v22.4.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/drone/envsubst/v2 v2.0.0-20210730161058-179042472c46 // indirect
Expand Down Expand Up @@ -99,6 +100,7 @@ require (
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.15.1 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Cache interface {
// Informer forwards events to event handlers.
type Informer interface {
AddEventHandler(handler InformEventHandler) error
RemoveEventHandler(handler InformEventHandler) error
}

// InformEventHandler handle events originated by a source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,8 @@ func (c *cache) doTryDeleteLocked(resourceGroup string, tracker *resourceGroupTr
delete(tracker.ownedObjects, ownReference{gvk: objGVK, key: objKey})
}

// If the object still has finalizers, only set the deletion timestamp if not already set.
if len(obj.GetFinalizers()) > 0 {
if !obj.GetDeletionTimestamp().IsZero() {
return false, nil
}
// Set the deletion timestamp if not already set.
if obj.GetDeletionTimestamp().IsZero() {
if err := c.beforeDelete(resourceGroup, obj); err != nil {
return false, apierrors.NewBadRequest(err.Error())
}
Expand All @@ -424,13 +421,18 @@ func (c *cache) doTryDeleteLocked(resourceGroup string, tracker *resourceGroupTr
if err := c.beforeUpdate(resourceGroup, oldObj, obj); err != nil {
return false, apierrors.NewBadRequest(err.Error())
}

// TODO: (killianmuldoon) Understand if setting this twice is necessary.
// Required to override default beforeUpdate behaviour
// that prevent changes to automatically managed fields.
obj.SetDeletionTimestamp(&now)

objects[objKey] = obj
c.afterUpdate(resourceGroup, oldObj, obj)
}

// If the object still has finalizers return early.
if len(obj.GetFinalizers()) > 0 {
return false, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ func (i *informer) AddEventHandler(handler InformEventHandler) error {
return nil
}

func (i *informer) RemoveEventHandler(handler InformEventHandler) error {
i.lock.Lock()
defer i.lock.Unlock()
for j, h := range i.handlers {
if h == handler {
i.handlers = append(i.handlers[:j], i.handlers[j+1:]...)
}
}
return nil
}

func (c *cache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
Expand Down
144 changes: 144 additions & 0 deletions test/infrastructure/inmemory/internal/server/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/emicklei/go-restful/v3"
"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand All @@ -36,9 +37,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/portforward"
"sigs.k8s.io/controller-runtime/pkg/client"

cclient "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/runtime/client"
cmanager "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/runtime/manager"
gportforward "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/server/api/portforward"
)
Expand Down Expand Up @@ -230,6 +233,16 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
return
}

// If the request is for a watch it will be handled separately.
if isWatch(req) {
err = h.watchForResource(ctx, cloudClient, *gvk, resourceGroup, req, resp)
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
return
}

// Reads and returns the requested data.
list := &unstructured.UnstructuredList{}
list.SetAPIVersion(gvk.GroupVersion().String())
Expand Down Expand Up @@ -529,3 +542,134 @@ func getAPIResourceList(req *restful.Request) *metav1.APIResourceList {
}
return corev1APIResourceList
}

func (h *apiServerHandler) watchForResource(ctx context.Context, cloudClient cclient.Client, gvk schema.GroupVersionKind, resourceGroup string, req *restful.Request, resp *restful.Response) error {
c := h.manager.GetCache()
i, err := c.GetInformerForKind(ctx, gvk)
if err != nil {
return err
}
h.log.Info(fmt.Sprintf("[DEBUG] Serving Watch for %v", req.Request.URL))
// This needs to be an unbuffered channel as otherwise it can cause a deadlock.
// TODO: explain this better.
events := make(chan *Event, 10)
watcher := &WatchEventDispatcher{
resourceGroup: resourceGroup,
events: events,
}

if err := i.AddEventHandler(watcher); err != nil {
return err
}
// TODO: Catch this error and return.
// Empty the channel here.
defer func() {
i.RemoveEventHandler(watcher)
// TODO: Doing this to ensure the channel is empty.
for event := range events {
_ = event
}
}()

watcher.Run(*req.Request, gvk, resp, h)
return nil
}

// Run serves a series of encoded events via HTTP with Transfer-Encoding: chunked
func (m *WatchEventDispatcher) Run(req http.Request, gvk schema.GroupVersionKind, w http.ResponseWriter, h *apiServerHandler) {
h.log.Info(fmt.Sprintf("[DEBUG] Starting a watch %v", req))

flusher, ok := w.(http.Flusher)
if !ok {
fmt.Printf("unable to start watch - can't get http.Flusher: %#v", w)
return
}
resp, ok := w.(*restful.Response)
if !ok {
fmt.Printf("unable to start watch - can't get restful.Response: %#v", w)
return
}
//TODO: Add timeout
// begin the stream
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()

done := req.Context().Done()

for {
select {
case <-done:
return
case event, ok := <-m.events:
h.log.Info(fmt.Sprintf("[DEBUG] Writing event %v", spew.Sprint(event)))
if !ok {
h.log.Info(fmt.Sprintf("[DEBUG] closing watch %v", spew.Sprint(event)))
// End of results.
return
}
if err := resp.WriteEntity(event); err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
}
if len(m.events) == 0 {
flusher.Flush()
}
}
}
}

type Event struct {
Type watch.EventType `json:"type,omitempty"`
Object runtime.Object `json:"object,omitempty"`
}

func isWatch(req *restful.Request) bool {
// TODO: Check if this is the case in all watch cases.
// Check for existence of the query param.
return req.QueryParameter("watch") == "true"
}

type WatchEventDispatcher struct {
resourceGroup string
events chan (*Event)
}

func (m *WatchEventDispatcher) OnCreate(resourceGroup string, o client.Object) {
if resourceGroup != m.resourceGroup {
return
}
m.events <- &Event{
Type: watch.Added,
Object: o,
}
}

func (m *WatchEventDispatcher) OnUpdate(resourceGroup string, old, new client.Object) {
if resourceGroup != m.resourceGroup {
return
}
m.events <- &Event{
Type: watch.Modified,
Object: new,
}
}

func (m *WatchEventDispatcher) OnDelete(resourceGroup string, o client.Object) {
if resourceGroup != m.resourceGroup {
return
}
m.events <- &Event{
Type: watch.Deleted,
Object: o,
}
}

func (m *WatchEventDispatcher) OnGeneric(resourceGroup string, o client.Object) {
if resourceGroup != m.resourceGroup {
return
}
m.events <- &Event{
Type: watch.EventType("GENERIC"),
Object: o,
}
}
25 changes: 25 additions & 0 deletions test/infrastructure/inmemory/internal/server/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,28 @@ func (s *WorkloadClusterListener) GetClient() (client.Client, error) {

return c, nil
}

// GetClient returns a client for a WorkloadClusterListener.
func (s *WorkloadClusterListener) GetClientWithWatch() (client.WithWatch, error) {
restConfig, err := s.RESTConfig()
if err != nil {
return nil, err
}

httpClient, err := rest.HTTPClientFor(restConfig)
if err != nil {
return nil, err
}

mapper, err := apiutil.NewDynamicRESTMapper(restConfig, httpClient)
if err != nil {
return nil, err
}

c, err := client.NewWithWatch(restConfig, client.Options{Scheme: s.scheme, Mapper: mapper})
if err != nil {
return nil, err
}

return c, nil
}
Loading

0 comments on commit a32a0c0

Please sign in to comment.