Skip to content

Commit

Permalink
Send DAG status updates via StatusUpdateHandler
Browse files Browse the repository at this point in the history
Updates the DAG StatusClient to use the StatusUpdateWriter pattern.

Fixes projectcontour#2560
Fixes projectcontour#2580
Fixes projectcontour#2522

We should be able to remove the StatusClient eventually, but that API also needs refactoring for adding Conditions (currently under projectcontour#2495), so for now I've just updated it to send updates via the StatusUpdateWriter rather than make API calls directly.

Signed-off-by: Nick Young <ynick@vmware.com>
  • Loading branch information
Nick Young committed Jun 19, 2020
1 parent d75219a commit 36f62e8
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 82 deletions.
14 changes: 8 additions & 6 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,6 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
},
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
StatusClient: &k8s.StatusWriter{
Client: clients.DynamicClient(),
Converter: converter,
},
Builder: dag.Builder{
Source: dag.KubernetesCache{
RootNamespaces: ctx.proxyRootNamespaces(),
Expand Down Expand Up @@ -326,17 +322,22 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
LeaderElected: eventHandler.IsLeader,
Converter: converter,
}
suw := sh.Writer()
g.Add(sh.Start)

// Now we have the statusUpdateWriter, we can create the StatusWriter, which will take the
// status updates from the DAG, and send them to the status update handler.
eventHandler.StatusClient = &k8s.StatusWriter{
Updater: sh.Writer(),
}

// step 11. set up ingress load balancer status writer
lbsw := loadBalancerStatusWriter{
log: log.WithField("context", "loadBalancerStatusWriter"),
clients: clients,
isLeader: eventHandler.IsLeader,
lbStatus: make(chan corev1.LoadBalancerStatus, 1),
ingressClass: ctx.ingressClass,
statusUpdater: suw,
statusUpdater: sh.Writer(),
Converter: converter,
}
g.Add(lbsw.Start)
Expand All @@ -347,6 +348,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
Next: &k8s.ServiceStatusLoadBalancerWatcher{
ServiceName: ctx.EnvoyServiceName,
LBStatus: lbsw.lbStatus,
Log: log.WithField("context", "serviceStatusLoadBalancerWatcher"),
},
Converter: converter,
Logger: log.WithField("context", "serviceStatusLoadBalancerWatcher"),
Expand Down
55 changes: 19 additions & 36 deletions internal/k8s/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@
package k8s

import (
"context"
"errors"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/client-go/dynamic"

projcontour "github.com/projectcontour/contour/apis/projectcontour/v1"
)

Expand Down Expand Up @@ -102,8 +97,7 @@ func (c *StatusCacher) SetStatus(status, desc string, obj interface{}) error {

// StatusWriter updates the object's Status field.
type StatusWriter struct {
Client dynamic.Interface
Converter Converter
Updater StatusUpdater
}

// GetStatus is not implemented for StatusWriter.
Expand All @@ -115,35 +109,24 @@ func (irs *StatusWriter) GetStatus(obj interface{}) (*projcontour.Status, error)
func (irs *StatusWriter) SetStatus(status, desc string, existing interface{}) error {
switch exist := existing.(type) {
case *projcontour.HTTPProxy:
// Check if update needed by comparing status & desc
if irs.updateNeeded(status, desc, exist.Status) {
updated := exist.DeepCopy()
updated.Status = projcontour.Status{
CurrentStatus: status,
Description: desc,
}
return irs.setHTTPProxyStatus(updated)
}
// StatusUpdateWriters only apply an update if required, so
// we don't need to check here.
irs.Updater.Update(exist.Name,
exist.Namespace,
projcontour.HTTPProxyGVR,
StatusMutatorFunc(func(obj interface{}) interface{} {
switch o := obj.(type) {
case *projcontour.HTTPProxy:
dco := o.DeepCopy()
dco.Status.CurrentStatus = status
dco.Status.Description = desc
return dco
default:
panic(fmt.Sprintf("Unsupported object %s/%s in status Address mutator",
exist.Namespace, exist.Name,
))
}
}))
}
return nil
}

func (irs *StatusWriter) updateNeeded(status, desc string, existing projcontour.Status) bool {
if existing.CurrentStatus != status || existing.Description != desc {
return true
}
return false
}

func (irs *StatusWriter) setHTTPProxyStatus(updated *projcontour.HTTPProxy) error {

usUpdated, err := irs.Converter.ToUnstructured(updated)
if err != nil {
return fmt.Errorf("unable to convert status update to HTTPProxy: %s", err)
}

_, err = irs.Client.Resource(projcontour.HTTPProxyGVR).Namespace(updated.GetNamespace()).
UpdateStatus(context.TODO(), usUpdated, metav1.UpdateOptions{})

return err
}
53 changes: 16 additions & 37 deletions internal/k8s/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@ import (

"github.com/projectcontour/contour/internal/assert"

"k8s.io/client-go/dynamic/fake"

projcontour "github.com/projectcontour/contour/apis/projectcontour/v1"
projectcontour "github.com/projectcontour/contour/apis/projectcontour/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8stesting "k8s.io/client-go/testing"
)

func TestSetHTTPProxyStatus(t *testing.T) {
Expand All @@ -42,43 +38,19 @@ func TestSetHTTPProxyStatus(t *testing.T) {

t.Run(name, func(t *testing.T) {
t.Helper()
var gotObj runtime.Object

s := runtime.NewScheme()
if err := projcontour.AddToScheme(s); err != nil {
t.Fatalf("adding to scheme: %s", err)
}

usc, err := NewUnstructuredConverter()
if err != nil {
t.Fatal(err)
}

client := fake.NewSimpleDynamicClient(s, tc.existing)

client.PrependReactor("*", "httpproxies", func(action k8stesting.Action) (bool, runtime.Object, error) {
switch updateAction := action.(type) {
default:
return true, nil, fmt.Errorf("got unexpected action of type: %T", action)
case k8stesting.UpdateActionImpl:
gotObj = updateAction.GetObject()
return true, tc.existing, nil
}
})

suc := &StatusUpdateCacher{}
proxysw := StatusWriter{
Client: client,
Converter: usc,
Updater: suc,
}

suc.AddObject(tc.existing.Name, tc.existing.Namespace, projcontour.HTTPProxyGVR, tc.existing)

if err := proxysw.SetStatus(tc.msg, tc.desc, tc.existing); err != nil {
t.Fatal(fmt.Errorf("unable to set proxy status: %s", err))
}

toProxy, err := usc.FromUnstructured(gotObj)
if err != nil {
t.Fatal(err)
}
toProxy := suc.GetObject(tc.existing.Name, tc.existing.Namespace, projcontour.HTTPProxyGVR)

if toProxy == nil && tc.expected == nil {
return
Expand Down Expand Up @@ -130,7 +102,16 @@ func TestSetHTTPProxyStatus(t *testing.T) {
Description: "this is a valid HTTPProxy",
},
},
expected: nil,
expected: &projcontour.HTTPProxy{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Status: projcontour.Status{
CurrentStatus: "valid",
Description: "this is a valid HTTPProxy",
},
},
})

run(t, "replace existing status", testcase{
Expand Down Expand Up @@ -172,9 +153,7 @@ func TestGetStatus(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Helper()

proxysw := StatusWriter{
Client: fake.NewSimpleDynamicClient(runtime.NewScheme()),
}
proxysw := StatusWriter{}

status, err := proxysw.GetStatus(tc.input)

Expand Down
22 changes: 20 additions & 2 deletions internal/k8s/statusaddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *StatusAddressUpdater) OnAdd(obj interface{}) {
WithField("ingress-class", annotation.IngressClass(typed)).
WithField("defined-ingress-class", s.IngressClass).
WithField("kind", kind).
Debug("unmatched ingress class, skip status update")
Debug("unmatched ingress class, skipping status address update")
return
}

Expand All @@ -83,7 +83,7 @@ func (s *StatusAddressUpdater) OnAdd(obj interface{}) {
WithField("ingress-class", annotation.IngressClass(typed)).
WithField("kind", kind).
WithField("defined-ingress-class", s.IngressClass).
Debug("Received an object, sending status update")
Debug("Received an object, sending status address update")

s.StatusUpdater.Update(
typed.GetObjectMeta().GetName(),
Expand Down Expand Up @@ -128,6 +128,7 @@ func (s *StatusAddressUpdater) OnDelete(obj interface{}) {
type ServiceStatusLoadBalancerWatcher struct {
ServiceName string
LBStatus chan v1.LoadBalancerStatus
Log logrus.FieldLogger
}

func (s *ServiceStatusLoadBalancerWatcher) OnAdd(obj interface{}) {
Expand All @@ -136,9 +137,17 @@ func (s *ServiceStatusLoadBalancerWatcher) OnAdd(obj interface{}) {
// not a service
return
}
s.Log.WithField("name", svc.Name).
WithField("namespace", svc.Namespace).
Debug("saw a service add")

if svc.Name != s.ServiceName {
return
}
s.Log.WithField("name", svc.Name).
WithField("namespace", svc.Namespace).
Debug("Received new service address")

s.notify(svc.Status.LoadBalancer)
}

Expand All @@ -148,9 +157,18 @@ func (s *ServiceStatusLoadBalancerWatcher) OnUpdate(oldObj, newObj interface{})
// not a service
return
}

s.Log.WithField("name", svc.Name).
WithField("namespace", svc.Namespace).
Debug("saw a service update")

if svc.Name != s.ServiceName {
return
}
s.Log.WithField("name", svc.Name).
WithField("namespace", svc.Namespace).
Debug("Received new service address")

s.notify(svc.Status.LoadBalancer)
}

Expand Down
20 changes: 20 additions & 0 deletions internal/k8s/statusaddress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,27 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
)

func testLogger(t *testing.T) logrus.FieldLogger {
log := logrus.New()
log.Out = &testWriter{t}
return log
}

type testWriter struct {
*testing.T
}

func (t *testWriter) Write(buf []byte) (int, error) {
t.Logf("%s", buf)
return len(buf), nil
}

func TestServiceStatusLoadBalancerWatcherOnAdd(t *testing.T) {
lbstatus := make(chan v1.LoadBalancerStatus, 1)
sw := ServiceStatusLoadBalancerWatcher{
ServiceName: "envoy",
LBStatus: lbstatus,
Log: testLogger(t),
}

recv := func() (v1.LoadBalancerStatus, bool) {
Expand Down Expand Up @@ -74,9 +90,11 @@ func TestServiceStatusLoadBalancerWatcherOnAdd(t *testing.T) {

func TestServiceStatusLoadBalancerWatcherOnUpdate(t *testing.T) {
lbstatus := make(chan v1.LoadBalancerStatus, 1)

sw := ServiceStatusLoadBalancerWatcher{
ServiceName: "envoy",
LBStatus: lbstatus,
Log: testLogger(t),
}

recv := func() (v1.LoadBalancerStatus, bool) {
Expand Down Expand Up @@ -122,9 +140,11 @@ func TestServiceStatusLoadBalancerWatcherOnUpdate(t *testing.T) {

func TestServiceStatusLoadBalancerWatcherOnDelete(t *testing.T) {
lbstatus := make(chan v1.LoadBalancerStatus, 1)

sw := ServiceStatusLoadBalancerWatcher{
ServiceName: "envoy",
LBStatus: lbstatus,
Log: testLogger(t),
}

recv := func() (v1.LoadBalancerStatus, bool) {
Expand Down
4 changes: 3 additions & 1 deletion internal/k8s/statusupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ func (suh *StatusUpdateHandler) Start(stop <-chan struct{}) error {
continue
}

suh.Log.WithField("fullname", upd.FullName).Debug("received a status update")
suh.Log.WithField("name", upd.FullName.Name).
WithField("namespace", upd.FullName.Namespace).
Debug("received a status update")
uObj, err := suh.Clients.DynamicClient().
Resource(upd.Resource).
Namespace(upd.FullName.Namespace).Get(context.TODO(), upd.FullName.Name, metav1.GetOptions{})
Expand Down

0 comments on commit 36f62e8

Please sign in to comment.