diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index 2e87f19803f..1ed6b45dcbe 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -191,10 +191,6 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { }, HoldoffDelay: 100 * time.Millisecond, HoldoffMaxDelay: 500 * time.Millisecond, - StatusClient: &k8s.StatusWriter{ - Client: clients.DynamicClient(), - Converter: converter, - }, Builder: dag.Builder{ Source: dag.KubernetesCache{ RootNamespaces: ctx.proxyRootNamespaces(), @@ -326,9 +322,14 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { LeaderElected: eventHandler.IsLeader, Converter: converter, } - suw := sh.Writer() g.Add(sh.Start) + // Now we have the statusUpdateWriter, we can create the StatusWriter, which will take the + // status updates from the DAG, and send them to the status update handler. + eventHandler.StatusClient = &k8s.StatusWriter{ + Updater: sh.Writer(), + } + // step 11. set up ingress load balancer status writer lbsw := loadBalancerStatusWriter{ log: log.WithField("context", "loadBalancerStatusWriter"), @@ -336,7 +337,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { isLeader: eventHandler.IsLeader, lbStatus: make(chan corev1.LoadBalancerStatus, 1), ingressClass: ctx.ingressClass, - statusUpdater: suw, + statusUpdater: sh.Writer(), Converter: converter, } g.Add(lbsw.Start) @@ -347,6 +348,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { Next: &k8s.ServiceStatusLoadBalancerWatcher{ ServiceName: ctx.EnvoyServiceName, LBStatus: lbsw.lbStatus, + Log: log.WithField("context", "serviceStatusLoadBalancerWatcher"), }, Converter: converter, Logger: log.WithField("context", "serviceStatusLoadBalancerWatcher"), diff --git a/internal/k8s/status.go b/internal/k8s/status.go index 3ca9c23d2ed..488f22ca138 100644 --- a/internal/k8s/status.go +++ b/internal/k8s/status.go @@ -14,14 +14,9 @@ package k8s import ( - "context" "errors" "fmt" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "k8s.io/client-go/dynamic" - projcontour "github.com/projectcontour/contour/apis/projectcontour/v1" ) @@ -102,8 +97,7 @@ func (c *StatusCacher) SetStatus(status, desc string, obj interface{}) error { // StatusWriter updates the object's Status field. type StatusWriter struct { - Client dynamic.Interface - Converter Converter + Updater StatusUpdater } // GetStatus is not implemented for StatusWriter. @@ -115,35 +109,24 @@ func (irs *StatusWriter) GetStatus(obj interface{}) (*projcontour.Status, error) func (irs *StatusWriter) SetStatus(status, desc string, existing interface{}) error { switch exist := existing.(type) { case *projcontour.HTTPProxy: - // Check if update needed by comparing status & desc - if irs.updateNeeded(status, desc, exist.Status) { - updated := exist.DeepCopy() - updated.Status = projcontour.Status{ - CurrentStatus: status, - Description: desc, - } - return irs.setHTTPProxyStatus(updated) - } + // StatusUpdateWriters only apply an update if required, so + // we don't need to check here. + irs.Updater.Update(exist.Name, + exist.Namespace, + projcontour.HTTPProxyGVR, + StatusMutatorFunc(func(obj interface{}) interface{} { + switch o := obj.(type) { + case *projcontour.HTTPProxy: + dco := o.DeepCopy() + dco.Status.CurrentStatus = status + dco.Status.Description = desc + return dco + default: + panic(fmt.Sprintf("Unsupported object %s/%s in status Address mutator", + exist.Namespace, exist.Name, + )) + } + })) } return nil } - -func (irs *StatusWriter) updateNeeded(status, desc string, existing projcontour.Status) bool { - if existing.CurrentStatus != status || existing.Description != desc { - return true - } - return false -} - -func (irs *StatusWriter) setHTTPProxyStatus(updated *projcontour.HTTPProxy) error { - - usUpdated, err := irs.Converter.ToUnstructured(updated) - if err != nil { - return fmt.Errorf("unable to convert status update to HTTPProxy: %s", err) - } - - _, err = irs.Client.Resource(projcontour.HTTPProxyGVR).Namespace(updated.GetNamespace()). - UpdateStatus(context.TODO(), usUpdated, metav1.UpdateOptions{}) - - return err -} diff --git a/internal/k8s/status_test.go b/internal/k8s/status_test.go index d412f65c4ab..3c31a8ed655 100644 --- a/internal/k8s/status_test.go +++ b/internal/k8s/status_test.go @@ -20,13 +20,9 @@ import ( "github.com/projectcontour/contour/internal/assert" - "k8s.io/client-go/dynamic/fake" - projcontour "github.com/projectcontour/contour/apis/projectcontour/v1" projectcontour "github.com/projectcontour/contour/apis/projectcontour/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - k8stesting "k8s.io/client-go/testing" ) func TestSetHTTPProxyStatus(t *testing.T) { @@ -42,43 +38,19 @@ func TestSetHTTPProxyStatus(t *testing.T) { t.Run(name, func(t *testing.T) { t.Helper() - var gotObj runtime.Object - - s := runtime.NewScheme() - if err := projcontour.AddToScheme(s); err != nil { - t.Fatalf("adding to scheme: %s", err) - } - - usc, err := NewUnstructuredConverter() - if err != nil { - t.Fatal(err) - } - - client := fake.NewSimpleDynamicClient(s, tc.existing) - - client.PrependReactor("*", "httpproxies", func(action k8stesting.Action) (bool, runtime.Object, error) { - switch updateAction := action.(type) { - default: - return true, nil, fmt.Errorf("got unexpected action of type: %T", action) - case k8stesting.UpdateActionImpl: - gotObj = updateAction.GetObject() - return true, tc.existing, nil - } - }) + suc := &StatusUpdateCacher{} proxysw := StatusWriter{ - Client: client, - Converter: usc, + Updater: suc, } + suc.AddObject(tc.existing.Name, tc.existing.Namespace, projcontour.HTTPProxyGVR, tc.existing) + if err := proxysw.SetStatus(tc.msg, tc.desc, tc.existing); err != nil { t.Fatal(fmt.Errorf("unable to set proxy status: %s", err)) } - toProxy, err := usc.FromUnstructured(gotObj) - if err != nil { - t.Fatal(err) - } + toProxy := suc.GetObject(tc.existing.Name, tc.existing.Namespace, projcontour.HTTPProxyGVR) if toProxy == nil && tc.expected == nil { return @@ -130,7 +102,16 @@ func TestSetHTTPProxyStatus(t *testing.T) { Description: "this is a valid HTTPProxy", }, }, - expected: nil, + expected: &projcontour.HTTPProxy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Status: projcontour.Status{ + CurrentStatus: "valid", + Description: "this is a valid HTTPProxy", + }, + }, }) run(t, "replace existing status", testcase{ @@ -172,9 +153,7 @@ func TestGetStatus(t *testing.T) { t.Run(name, func(t *testing.T) { t.Helper() - proxysw := StatusWriter{ - Client: fake.NewSimpleDynamicClient(runtime.NewScheme()), - } + proxysw := StatusWriter{} status, err := proxysw.GetStatus(tc.input) diff --git a/internal/k8s/statusaddress.go b/internal/k8s/statusaddress.go index ec0cde4d5bd..cd12c3acd35 100644 --- a/internal/k8s/statusaddress.go +++ b/internal/k8s/statusaddress.go @@ -73,7 +73,7 @@ func (s *StatusAddressUpdater) OnAdd(obj interface{}) { WithField("ingress-class", annotation.IngressClass(typed)). WithField("defined-ingress-class", s.IngressClass). WithField("kind", kind). - Debug("unmatched ingress class, skip status update") + Debug("unmatched ingress class, skipping status address update") return } @@ -83,7 +83,7 @@ func (s *StatusAddressUpdater) OnAdd(obj interface{}) { WithField("ingress-class", annotation.IngressClass(typed)). WithField("kind", kind). WithField("defined-ingress-class", s.IngressClass). - Debug("Received an object, sending status update") + Debug("received an object, sending status address update") s.StatusUpdater.Update( typed.GetObjectMeta().GetName(), @@ -128,6 +128,7 @@ func (s *StatusAddressUpdater) OnDelete(obj interface{}) { type ServiceStatusLoadBalancerWatcher struct { ServiceName string LBStatus chan v1.LoadBalancerStatus + Log logrus.FieldLogger } func (s *ServiceStatusLoadBalancerWatcher) OnAdd(obj interface{}) { @@ -139,6 +140,10 @@ func (s *ServiceStatusLoadBalancerWatcher) OnAdd(obj interface{}) { if svc.Name != s.ServiceName { return } + s.Log.WithField("name", svc.Name). + WithField("namespace", svc.Namespace). + Debug("received new service address") + s.notify(svc.Status.LoadBalancer) } @@ -151,6 +156,10 @@ func (s *ServiceStatusLoadBalancerWatcher) OnUpdate(oldObj, newObj interface{}) if svc.Name != s.ServiceName { return } + s.Log.WithField("name", svc.Name). + WithField("namespace", svc.Namespace). + Debug("received new service address") + s.notify(svc.Status.LoadBalancer) } diff --git a/internal/k8s/statusaddress_test.go b/internal/k8s/statusaddress_test.go index 2bf04b2148f..0a51583c204 100644 --- a/internal/k8s/statusaddress_test.go +++ b/internal/k8s/statusaddress_test.go @@ -26,11 +26,27 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" ) +func testLogger(t *testing.T) logrus.FieldLogger { + log := logrus.New() + log.Out = &testWriter{t} + return log +} + +type testWriter struct { + *testing.T +} + +func (t *testWriter) Write(buf []byte) (int, error) { + t.Logf("%s", buf) + return len(buf), nil +} + func TestServiceStatusLoadBalancerWatcherOnAdd(t *testing.T) { lbstatus := make(chan v1.LoadBalancerStatus, 1) sw := ServiceStatusLoadBalancerWatcher{ ServiceName: "envoy", LBStatus: lbstatus, + Log: testLogger(t), } recv := func() (v1.LoadBalancerStatus, bool) { @@ -74,9 +90,11 @@ func TestServiceStatusLoadBalancerWatcherOnAdd(t *testing.T) { func TestServiceStatusLoadBalancerWatcherOnUpdate(t *testing.T) { lbstatus := make(chan v1.LoadBalancerStatus, 1) + sw := ServiceStatusLoadBalancerWatcher{ ServiceName: "envoy", LBStatus: lbstatus, + Log: testLogger(t), } recv := func() (v1.LoadBalancerStatus, bool) { @@ -122,9 +140,11 @@ func TestServiceStatusLoadBalancerWatcherOnUpdate(t *testing.T) { func TestServiceStatusLoadBalancerWatcherOnDelete(t *testing.T) { lbstatus := make(chan v1.LoadBalancerStatus, 1) + sw := ServiceStatusLoadBalancerWatcher{ ServiceName: "envoy", LBStatus: lbstatus, + Log: testLogger(t), } recv := func() (v1.LoadBalancerStatus, bool) { diff --git a/internal/k8s/statusupdater.go b/internal/k8s/statusupdater.go index f014078e3b5..02e561867f6 100644 --- a/internal/k8s/statusupdater.go +++ b/internal/k8s/statusupdater.go @@ -78,7 +78,9 @@ func (suh *StatusUpdateHandler) Start(stop <-chan struct{}) error { continue } - suh.Log.WithField("fullname", upd.FullName).Debug("received a status update") + suh.Log.WithField("name", upd.FullName.Name). + WithField("namespace", upd.FullName.Namespace). + Debug("received a status update") uObj, err := suh.Clients.DynamicClient(). Resource(upd.Resource). Namespace(upd.FullName.Namespace).Get(context.TODO(), upd.FullName.Name, metav1.GetOptions{})