Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Sardo committed Apr 7, 2017
1 parent 407fe63 commit 3479c42
Show file tree
Hide file tree
Showing 14 changed files with 227 additions and 183 deletions.
74 changes: 60 additions & 14 deletions controllers/gce/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

compute "google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
api_v1 "k8s.io/client-go/pkg/api/v1"

"k8s.io/ingress/controllers/gce/healthchecks"
"k8s.io/ingress/controllers/gce/instances"
Expand Down Expand Up @@ -75,6 +76,7 @@ type Backends struct {
nodePool instances.NodePool
healthChecker healthchecks.HealthChecker
snapshotter storage.Snapshotter
prober probeProvider
// ignoredPorts are a set of ports excluded from GC, even
// after the Ingress has been deleted. Note that invoking
// a Delete() on these ports will still delete the backend.
Expand Down Expand Up @@ -140,6 +142,10 @@ func NewBackendPool(
return backendPool
}

func (b *Backends) Init(pp probeProvider) {
b.prober = pp
}

// Get returns a single backend.
func (b *Backends) Get(port int64) (*compute.BackendService, error) {
be, err := b.cloud.GetBackendService(b.namer.BeName(port))
Expand All @@ -150,27 +156,45 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
return be, nil
}

func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, encrypted bool, name string) (*compute.BackendService, error) {
// Create a new health check
if err := b.healthChecker.Add(namedPort.Port, encrypted); err != nil {
return nil, err
func (b *Backends) ensureHealthCheck(port int64, encrypted bool) (string, error) {
hc := b.healthChecker.New(port, encrypted)
if b.prober != nil {
probe, err := b.prober.GetProbe(port)
if err != nil {
return "", err
}
if probe != nil {
applyProbeSettingsToHC(probe, hc)
}
}

if err := b.healthChecker.Sync(hc); err != nil {
return "", err
}
//TODO: Avoid this second call
hc, err := b.healthChecker.Get(port, encrypted)
if err != nil {
return "", err
}
return hc.SelfLink, nil
}

hc, err := b.healthChecker.Get(namedPort.Port, encrypted)
func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, encrypted bool, name string) (*compute.BackendService, error) {
hclink, err := b.ensureHealthCheck(namedPort.Port, encrypted)
if err != nil {
return nil, err
}

var errs []string
// We first try to create the backend with balancingMode=RATE. If this
// fails, it's mostly likely because there are existing backends with
// balancingMode=UTILIZATION. This failure mode throws a googleapi error
// balancingMode=UTILIZATION. This failure mode throws a googleapi_v1 error
// which wraps a HTTP 400 status code. We handle it in the loop below
// and come around to retry with the right balancing mode. The goal is to
// switch everyone to using RATE.
for _, bm := range []BalancingMode{Rate, Utilization} {
// Create a new backend
bs := newBackendService(igs, bm, namedPort, []string{hc.SelfLink}, encrypted, name)
bs := newBackendService(igs, bm, namedPort, []string{hclink}, encrypted, name)
if err := b.cloud.CreateBackendService(bs); err != nil {
// This is probably a failure because we tried to create the backend
// with balancingMode=RATE when there are already backends with
Expand Down Expand Up @@ -213,23 +237,22 @@ func newBackendService(igs []*compute.InstanceGroup, bm BalancingMode, namedPort

func (b *Backends) updateProtocol(bs *compute.BackendService, encrypted bool) (*compute.BackendService, error) {
// Create healthcheck with proper protocol
if err := b.healthChecker.Add(bs.Port, encrypted); err != nil {
return nil, err
}
hc, err := b.healthChecker.Get(bs.Port, encrypted)
hclink, err := b.ensureHealthCheck(bs.Port, encrypted)
if err != nil {
return nil, err
}

bs.Protocol = utils.GetHTTPScheme(encrypted)
bs.HealthChecks = []string{hc.SelfLink}
bs.HealthChecks = []string{hclink}

if err = b.cloud.UpdateBackendService(bs); err != nil {
return bs, err
}

// Attempt delete of previous healthcheck; ignore err because there's no harm for it to sit around
b.healthChecker.Delete(bs.Port, !encrypted)
// Attempt delete of previous healthcheck; warn that err occurred
if err = b.healthChecker.Delete(bs.Port, !encrypted); err != nil {
glog.Warningf("Failed to delete %v healthcheck for port %v, err: %v", utils.GetHTTPScheme(!encrypted), bs.Port, err)
}

return bs, nil
}
Expand Down Expand Up @@ -284,6 +307,7 @@ func (b *Backends) Delete(port int64) (err error) {
b.snapshotter.Delete(portKey(port))
}
}()
// Try deleting health checks even if a backend is not found.
if err = b.cloud.DeleteBackendService(name); err != nil &&
!utils.IsHTTPErrorCode(err, http.StatusNotFound) {
return err
Expand Down Expand Up @@ -415,3 +439,25 @@ func (b *Backends) Status(name string) string {
// TODO: State transition are important, not just the latest.
return hs.HealthStatus[0].HealthState
}

func applyProbeSettingsToHC(p *api_v1.Probe, hc *utils.HealthCheck) {
healthPath := p.Handler.HTTPGet.Path
// GCE requires a leading "/" for health check urls.
if !strings.HasPrefix(healthPath, "/") {
healthPath = fmt.Sprintf("/%v", healthPath)
}

host := p.Handler.HTTPGet.Host
// remember the ingresses that use this Service so we can send
// the right events

hc.RequestPath = healthPath
hc.Host = host
hc.Description = "kubernetes L7 health check from readiness probe."
// set a low health threshold and a high failure threshold.
// We're just trying to detect if the node networking is
// borked, service level outages will get detected sooner
// by kube-proxy.
hc.CheckIntervalSec = int64(p.PeriodSeconds + utils.DefaultHealthCheckInterval)
hc.TimeoutSec = int64(p.TimeoutSeconds)
}
48 changes: 45 additions & 3 deletions controllers/gce/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (

compute "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
api_v1 "k8s.io/client-go/pkg/api/v1"

"k8s.io/ingress/controllers/gce/healthchecks"
"k8s.io/ingress/controllers/gce/instances"
Expand All @@ -35,14 +37,28 @@ const defaultZone = "zone-a"

var noOpErrFunc = func(op int, be *compute.BackendService) error { return nil }

var existingProbe = &api_v1.Probe{
Handler: api_v1.Handler{
HTTPGet: &api_v1.HTTPGetAction{
Scheme: api_v1.URISchemeHTTP,
Path: "/my-special-path",
Port: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
}

func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) *Backends {
namer := &utils.Namer{}
nodePool := instances.NewNodePool(fakeIGs)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthCheckProvider(), "/", namer)
healthChecks.Init(&healthchecks.FakeHealthCheckGetter{DefaultHealthCheck: nil})
return NewBackendPool(
f, healthChecks, nodePool, namer, []int64{}, syncWithCloud)
bp := NewBackendPool(f, healthChecks, nodePool, namer, []int64{}, syncWithCloud)
probes := map[int64]*api_v1.Probe{80: existingProbe}
bp.Init(NewFakeProbeProvider(probes))
return bp
}

func TestBackendPoolAdd(t *testing.T) {
Expand Down Expand Up @@ -367,3 +383,29 @@ func TestBackendCreateBalancingMode(t *testing.T) {
pool.GC([]ServicePort{})
}
}

func TestApplyProbeSettingsToHC(t *testing.T) {
p := "healthz"
hc := utils.DefaultHealthCheckTemplate(8080, true)
probe := &api_v1.Probe{
Handler: api_v1.Handler{
HTTPGet: &api_v1.HTTPGetAction{
Scheme: api_v1.URISchemeHTTP,
Path: p,
Port: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
}

applyProbeSettingsToHC(probe, hc)

if hc.Encrypted != true || hc.Port != 8080 {
t.Errorf("Basic HC settings changed")
}
if hc.RequestPath != "/"+p {
t.Errorf("Failed to apply probe's requestpath")
}
}
18 changes: 18 additions & 0 deletions controllers/gce/backends/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

compute "google.golang.org/api/compute/v1"
api_v1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"

"k8s.io/ingress/controllers/gce/utils"
Expand Down Expand Up @@ -118,3 +119,20 @@ func (f *FakeBackendServices) GetHealth(name, instanceGroupLink string) (*comput
return &compute.BackendServiceGroupHealth{
HealthStatus: states}, nil
}

// FakeProbeProvider implements the probeProvider interface for tests.
type FakeProbeProvider struct {
probes map[int64]*api_v1.Probe
}

func NewFakeProbeProvider(probes map[int64]*api_v1.Probe) *FakeProbeProvider {
return &FakeProbeProvider{probes}
}

// GetProbe returns the probe for a given nodePort
func (pp *FakeProbeProvider) GetProbe(port int64) (*api_v1.Probe, error) {
if probe, exists := pp.probes[port]; exists {
return probe, nil
}
return nil, nil
}
7 changes: 7 additions & 0 deletions controllers/gce/backends/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@ package backends

import (
compute "google.golang.org/api/compute/v1"
api_v1 "k8s.io/client-go/pkg/api/v1"
)

// ProbeProvider retrieves a probe struct given a nodePort
type probeProvider interface {
GetProbe(nodePort int64) (*api_v1.Probe, error)
}

// BackendPool is an interface to manage a pool of kubernetes nodePort services
// as gce backendServices, and sync them through the BackendServices interface.
type BackendPool interface {
Init(p probeProvider)
Add(port ServicePort) error
Get(port int64) (*compute.BackendService, error)
Delete(port int64) error
Expand Down
4 changes: 1 addition & 3 deletions controllers/gce/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ type ClusterManager struct {
// Init initializes the cluster manager.
func (c *ClusterManager) Init(tr *GCETranslator) {
c.instancePool.Init(tr)
for _, h := range c.healthCheckers {
h.Init(tr)
}
c.backendPool.Init(tr)
// TODO: Initialize other members as needed.
}

Expand Down
34 changes: 17 additions & 17 deletions controllers/gce/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
listers "k8s.io/client-go/listers/core/v1"
base_api "k8s.io/client-go/pkg/api"
api "k8s.io/client-go/pkg/api/v1"
api_v1 "k8s.io/client-go/pkg/api/v1"
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
CloudClusterManager: clusterManager,
stopCh: make(chan struct{}),
recorder: eventBroadcaster.NewRecorder(base_api.Scheme,
api.EventSource{Component: "loadbalancer-controller"}),
api_v1.EventSource{Component: "loadbalancer-controller"}),
}
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
lbc.ingQueue = NewTaskQueue(lbc.sync)
Expand All @@ -117,7 +117,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, ingressClassKey)
return
}
lbc.recorder.Eventf(addIng, api.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.recorder.Eventf(addIng, api_v1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.ingQueue.enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
Expand Down Expand Up @@ -157,24 +157,24 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *

lbc.svcLister.Indexer, lbc.svcController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "services", namespace, fields.Everything()),
&api.Service{},
&api_v1.Service{},
resyncPeriod,
svcHandlers,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

lbc.podLister.Indexer, lbc.podController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "pods", namespace, fields.Everything()),
&api.Pod{},
&api_v1.Pod{},
resyncPeriod,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

// Node watch handlers
lbc.nodeLister.Indexer, lbc.nodeController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "nodes", api.NamespaceAll, fields.Everything()),
&api.Node{},
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "nodes", api_v1.NamespaceAll, fields.Everything()),
&api_v1.Node{},
resyncPeriod,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
Expand All @@ -189,7 +189,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *

// enqueueIngressForService enqueues all the Ingress' for a Service.
func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
svc := obj.(*api.Service)
svc := obj.(*api_v1.Service)
ings, err := lbc.ingLister.GetServiceIngress(svc)
if err != nil {
glog.V(5).Infof("ignoring service %v: %v", svc.Name, err)
Expand Down Expand Up @@ -311,7 +311,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
eventMsg += " :Quota"
}
if ingExists {
lbc.recorder.Eventf(obj.(*extensions.Ingress), api.EventTypeWarning, eventMsg, err.Error())
lbc.recorder.Eventf(obj.(*extensions.Ingress), api_v1.EventTypeWarning, eventMsg, err.Error())
} else {
err = fmt.Errorf("%v, error: %v", eventMsg, err)
}
Expand All @@ -331,10 +331,10 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
if urlMap, err := lbc.tr.toURLMap(&ing); err != nil {
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "UrlMap", err.Error())
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error())
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
}
return syncError
Expand All @@ -352,8 +352,8 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
return err
}
currIng.Status = extensions.IngressStatus{
LoadBalancer: api.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{
LoadBalancer: api_v1.LoadBalancerStatus{
Ingress: []api_v1.LoadBalancerIngress{
{IP: ip},
},
},
Expand All @@ -367,7 +367,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
if _, err := ingClient.UpdateStatus(currIng); err != nil {
return err
}
lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", ip)
lbc.recorder.Eventf(currIng, api_v1.EventTypeNormal, "CREATE", "ip: %v", ip)
}
}
// Update annotations through /update endpoint
Expand Down Expand Up @@ -435,11 +435,11 @@ func (lbc *LoadBalancerController) syncNodes(key string) error {
}

func getNodeReadyPredicate() listers.NodeConditionPredicate {
return func(node *api.Node) bool {
return func(node *api_v1.Node) bool {
for ix := range node.Status.Conditions {
condition := &node.Status.Conditions[ix]
if condition.Type == api.NodeReady {
return condition.Status == api.ConditionTrue
if condition.Type == api_v1.NodeReady {
return condition.Status == api_v1.ConditionTrue
}
}
return false
Expand Down
Loading

0 comments on commit 3479c42

Please sign in to comment.