Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send DAG status updates via StatusUpdateHandler #2613

Merged
merged 1 commit into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,6 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
},
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
StatusClient: &k8s.StatusWriter{
Client: clients.DynamicClient(),
Converter: converter,
},
Builder: dag.Builder{
Source: dag.KubernetesCache{
RootNamespaces: ctx.proxyRootNamespaces(),
Expand Down Expand Up @@ -326,17 +322,22 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
LeaderElected: eventHandler.IsLeader,
Converter: converter,
}
suw := sh.Writer()
g.Add(sh.Start)

// Now we have the statusUpdateWriter, we can create the StatusWriter, which will take the
// status updates from the DAG, and send them to the status update handler.
eventHandler.StatusClient = &k8s.StatusWriter{
Updater: sh.Writer(),
}

// step 11. set up ingress load balancer status writer
lbsw := loadBalancerStatusWriter{
log: log.WithField("context", "loadBalancerStatusWriter"),
clients: clients,
isLeader: eventHandler.IsLeader,
lbStatus: make(chan corev1.LoadBalancerStatus, 1),
ingressClass: ctx.ingressClass,
statusUpdater: suw,
statusUpdater: sh.Writer(),
Converter: converter,
}
g.Add(lbsw.Start)
Expand All @@ -347,6 +348,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
Next: &k8s.ServiceStatusLoadBalancerWatcher{
ServiceName: ctx.EnvoyServiceName,
LBStatus: lbsw.lbStatus,
Log: log.WithField("context", "serviceStatusLoadBalancerWatcher"),
},
Converter: converter,
Logger: log.WithField("context", "serviceStatusLoadBalancerWatcher"),
Expand Down
55 changes: 19 additions & 36 deletions internal/k8s/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@
package k8s

import (
"context"
"errors"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/client-go/dynamic"

projcontour "github.com/projectcontour/contour/apis/projectcontour/v1"
)

Expand Down Expand Up @@ -102,8 +97,7 @@ func (c *StatusCacher) SetStatus(status, desc string, obj interface{}) error {

// StatusWriter updates the object's Status field.
type StatusWriter struct {
Client dynamic.Interface
Converter Converter
Updater StatusUpdater
}

// GetStatus is not implemented for StatusWriter.
Expand All @@ -115,35 +109,24 @@ func (irs *StatusWriter) GetStatus(obj interface{}) (*projcontour.Status, error)
func (irs *StatusWriter) SetStatus(status, desc string, existing interface{}) error {
switch exist := existing.(type) {
case *projcontour.HTTPProxy:
// Check if update needed by comparing status & desc
if irs.updateNeeded(status, desc, exist.Status) {
updated := exist.DeepCopy()
updated.Status = projcontour.Status{
CurrentStatus: status,
Description: desc,
}
return irs.setHTTPProxyStatus(updated)
}
// StatusUpdateWriters only apply an update if required, so
// we don't need to check here.
irs.Updater.Update(exist.Name,
exist.Namespace,
projcontour.HTTPProxyGVR,
StatusMutatorFunc(func(obj interface{}) interface{} {
switch o := obj.(type) {
case *projcontour.HTTPProxy:
dco := o.DeepCopy()
dco.Status.CurrentStatus = status
dco.Status.Description = desc
return dco
default:
panic(fmt.Sprintf("Unsupported object %s/%s in status Address mutator",
exist.Namespace, exist.Name,
))
}
}))
}
return nil
}

func (irs *StatusWriter) updateNeeded(status, desc string, existing projcontour.Status) bool {
if existing.CurrentStatus != status || existing.Description != desc {
return true
}
return false
}

func (irs *StatusWriter) setHTTPProxyStatus(updated *projcontour.HTTPProxy) error {

usUpdated, err := irs.Converter.ToUnstructured(updated)
if err != nil {
return fmt.Errorf("unable to convert status update to HTTPProxy: %s", err)
}

_, err = irs.Client.Resource(projcontour.HTTPProxyGVR).Namespace(updated.GetNamespace()).
UpdateStatus(context.TODO(), usUpdated, metav1.UpdateOptions{})

return err
}
53 changes: 16 additions & 37 deletions internal/k8s/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@ import (

"github.com/projectcontour/contour/internal/assert"

"k8s.io/client-go/dynamic/fake"

projcontour "github.com/projectcontour/contour/apis/projectcontour/v1"
projectcontour "github.com/projectcontour/contour/apis/projectcontour/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8stesting "k8s.io/client-go/testing"
)

func TestSetHTTPProxyStatus(t *testing.T) {
Expand All @@ -42,43 +38,19 @@ func TestSetHTTPProxyStatus(t *testing.T) {

t.Run(name, func(t *testing.T) {
t.Helper()
var gotObj runtime.Object

s := runtime.NewScheme()
if err := projcontour.AddToScheme(s); err != nil {
t.Fatalf("adding to scheme: %s", err)
}

usc, err := NewUnstructuredConverter()
if err != nil {
t.Fatal(err)
}

client := fake.NewSimpleDynamicClient(s, tc.existing)

client.PrependReactor("*", "httpproxies", func(action k8stesting.Action) (bool, runtime.Object, error) {
switch updateAction := action.(type) {
default:
return true, nil, fmt.Errorf("got unexpected action of type: %T", action)
case k8stesting.UpdateActionImpl:
gotObj = updateAction.GetObject()
return true, tc.existing, nil
}
})

suc := &StatusUpdateCacher{}
proxysw := StatusWriter{
Client: client,
Converter: usc,
Updater: suc,
}

suc.AddObject(tc.existing.Name, tc.existing.Namespace, projcontour.HTTPProxyGVR, tc.existing)

if err := proxysw.SetStatus(tc.msg, tc.desc, tc.existing); err != nil {
t.Fatal(fmt.Errorf("unable to set proxy status: %s", err))
}

toProxy, err := usc.FromUnstructured(gotObj)
if err != nil {
t.Fatal(err)
}
toProxy := suc.GetObject(tc.existing.Name, tc.existing.Namespace, projcontour.HTTPProxyGVR)

if toProxy == nil && tc.expected == nil {
return
Expand Down Expand Up @@ -130,7 +102,16 @@ func TestSetHTTPProxyStatus(t *testing.T) {
Description: "this is a valid HTTPProxy",
},
},
expected: nil,
expected: &projcontour.HTTPProxy{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Status: projcontour.Status{
CurrentStatus: "valid",
Description: "this is a valid HTTPProxy",
},
},
})

run(t, "replace existing status", testcase{
Expand Down Expand Up @@ -172,9 +153,7 @@ func TestGetStatus(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Helper()

proxysw := StatusWriter{
Client: fake.NewSimpleDynamicClient(runtime.NewScheme()),
}
proxysw := StatusWriter{}

status, err := proxysw.GetStatus(tc.input)

Expand Down
13 changes: 11 additions & 2 deletions internal/k8s/statusaddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *StatusAddressUpdater) OnAdd(obj interface{}) {
WithField("ingress-class", annotation.IngressClass(typed)).
WithField("defined-ingress-class", s.IngressClass).
WithField("kind", kind).
Debug("unmatched ingress class, skip status update")
Debug("unmatched ingress class, skipping status address update")
return
}

Expand All @@ -83,7 +83,7 @@ func (s *StatusAddressUpdater) OnAdd(obj interface{}) {
WithField("ingress-class", annotation.IngressClass(typed)).
WithField("kind", kind).
WithField("defined-ingress-class", s.IngressClass).
Debug("Received an object, sending status update")
Debug("received an object, sending status address update")

s.StatusUpdater.Update(
typed.GetObjectMeta().GetName(),
Expand Down Expand Up @@ -128,6 +128,7 @@ func (s *StatusAddressUpdater) OnDelete(obj interface{}) {
type ServiceStatusLoadBalancerWatcher struct {
ServiceName string
LBStatus chan v1.LoadBalancerStatus
Log logrus.FieldLogger
}

func (s *ServiceStatusLoadBalancerWatcher) OnAdd(obj interface{}) {
Expand All @@ -139,6 +140,10 @@ func (s *ServiceStatusLoadBalancerWatcher) OnAdd(obj interface{}) {
if svc.Name != s.ServiceName {
return
}
s.Log.WithField("name", svc.Name).
WithField("namespace", svc.Namespace).
Debug("received new service address")

s.notify(svc.Status.LoadBalancer)
}

Expand All @@ -151,6 +156,10 @@ func (s *ServiceStatusLoadBalancerWatcher) OnUpdate(oldObj, newObj interface{})
if svc.Name != s.ServiceName {
return
}
s.Log.WithField("name", svc.Name).
WithField("namespace", svc.Namespace).
Debug("received new service address")

s.notify(svc.Status.LoadBalancer)
}

Expand Down
20 changes: 20 additions & 0 deletions internal/k8s/statusaddress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,27 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
)

func testLogger(t *testing.T) logrus.FieldLogger {
log := logrus.New()
log.Out = &testWriter{t}
return log
}

type testWriter struct {
*testing.T
}

func (t *testWriter) Write(buf []byte) (int, error) {
t.Logf("%s", buf)
return len(buf), nil
}

func TestServiceStatusLoadBalancerWatcherOnAdd(t *testing.T) {
lbstatus := make(chan v1.LoadBalancerStatus, 1)
sw := ServiceStatusLoadBalancerWatcher{
ServiceName: "envoy",
LBStatus: lbstatus,
Log: testLogger(t),
}

recv := func() (v1.LoadBalancerStatus, bool) {
Expand Down Expand Up @@ -74,9 +90,11 @@ func TestServiceStatusLoadBalancerWatcherOnAdd(t *testing.T) {

func TestServiceStatusLoadBalancerWatcherOnUpdate(t *testing.T) {
lbstatus := make(chan v1.LoadBalancerStatus, 1)

sw := ServiceStatusLoadBalancerWatcher{
ServiceName: "envoy",
LBStatus: lbstatus,
Log: testLogger(t),
}

recv := func() (v1.LoadBalancerStatus, bool) {
Expand Down Expand Up @@ -122,9 +140,11 @@ func TestServiceStatusLoadBalancerWatcherOnUpdate(t *testing.T) {

func TestServiceStatusLoadBalancerWatcherOnDelete(t *testing.T) {
lbstatus := make(chan v1.LoadBalancerStatus, 1)

sw := ServiceStatusLoadBalancerWatcher{
ServiceName: "envoy",
LBStatus: lbstatus,
Log: testLogger(t),
}

recv := func() (v1.LoadBalancerStatus, bool) {
Expand Down
4 changes: 3 additions & 1 deletion internal/k8s/statusupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ func (suh *StatusUpdateHandler) Start(stop <-chan struct{}) error {
continue
}

suh.Log.WithField("fullname", upd.FullName).Debug("received a status update")
suh.Log.WithField("name", upd.FullName.Name).
WithField("namespace", upd.FullName.Namespace).
Debug("received a status update")
uObj, err := suh.Clients.DynamicClient().
Resource(upd.Resource).
Namespace(upd.FullName.Namespace).Get(context.TODO(), upd.FullName.Name, metav1.GetOptions{})
Expand Down