Skip to content

Commit

Permalink
Support backside re-encryption
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Sardo committed Apr 12, 2017
1 parent 7c635a8 commit 06f03c4
Show file tree
Hide file tree
Showing 16 changed files with 903 additions and 378 deletions.
178 changes: 127 additions & 51 deletions controllers/gce/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

compute "google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
api_v1 "k8s.io/client-go/pkg/api/v1"

"k8s.io/ingress/controllers/gce/healthchecks"
"k8s.io/ingress/controllers/gce/instances"
Expand Down Expand Up @@ -75,6 +76,7 @@ type Backends struct {
nodePool instances.NodePool
healthChecker healthchecks.HealthChecker
snapshotter storage.Snapshotter
prober probeProvider
// ignoredPorts are a set of ports excluded from GC, even
// after the Ingress has been deleted. Note that invoking
// a Delete() on these ports will still delete the backend.
Expand All @@ -86,6 +88,12 @@ func portKey(port int64) string {
return fmt.Sprintf("%d", port)
}

// ServicePort for tupling port and protocol
type ServicePort struct {
Port int64
Protocol utils.AppProtocol
}

// NewBackendPool returns a new backend pool.
// - cloud: implements BackendServices and syncs backends with a cloud provider
// - healthChecker: is capable of producing health checks for backends.
Expand Down Expand Up @@ -134,6 +142,10 @@ func NewBackendPool(
return backendPool
}

func (b *Backends) Init(pp probeProvider) {
b.prober = pp
}

// Get returns a single backend.
func (b *Backends) Get(port int64) (*compute.BackendService, error) {
be, err := b.cloud.GetBackendService(b.namer.BeName(port))
Expand All @@ -144,44 +156,33 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
return be, nil
}

func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) {
// Create a new health check
if err := b.healthChecker.Add(namedPort.Port); err != nil {
return nil, err
}
hc, err := b.healthChecker.Get(namedPort.Port)
if err != nil {
return nil, err
func (b *Backends) ensureHealthCheck(port int64, protocol utils.AppProtocol) (string, error) {
hc := b.healthChecker.New(port, protocol)
if b.prober != nil {
probe, err := b.prober.GetProbe(port)
if err != nil {
return "", err
}
if probe != nil {
applyProbeSettingsToHC(probe, hc)
}
}
errs := []string{}

return b.healthChecker.Sync(hc)
}

func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, hcLink string, protocol utils.AppProtocol, name string) (*compute.BackendService, error) {
var errs []string
// We first try to create the backend with balancingMode=RATE. If this
// fails, it's mostly likely because there are existing backends with
// balancingMode=UTILIZATION. This failure mode throws a googleapi error
// which wraps a HTTP 400 status code. We handle it in the loop below
// and come around to retry with the right balancing mode. The goal is to
// switch everyone to using RATE.
for _, bm := range []BalancingMode{Rate, Utilization} {
backends := getBackendsForIGs(igs)
for _, b := range backends {
switch bm {
case Rate:
b.MaxRate = maxRPS
default:
// TODO: Set utilization and connection limits when we accept them
// as valid fields.
}
b.BalancingMode = string(bm)
}
// Create a new backend
backend := &compute.BackendService{
Name: name,
Protocol: "HTTP",
Backends: backends,
HealthChecks: []string{hc.SelfLink},
Port: namedPort.Port,
PortName: namedPort.Name,
}
if err := b.cloud.CreateBackendService(backend); err != nil {
bs := newBackendService(igs, bm, namedPort, []string{hcLink}, protocol, name)
if err := b.cloud.CreateBackendService(bs); err != nil {
// This is probably a failure because we tried to create the backend
// with balancingMode=RATE when there are already backends with
// balancingMode=UTILIZATION. Just ignore it and retry setting
Expand All @@ -198,31 +199,88 @@ func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.Named
return nil, fmt.Errorf("%v", strings.Join(errs, "\n"))
}

func newBackendService(igs []*compute.InstanceGroup, bm BalancingMode, namedPort *compute.NamedPort, healthCheckLinks []string, protocol utils.AppProtocol, name string) *compute.BackendService {
backends := getBackendsForIGs(igs)
for _, b := range backends {
switch bm {
case Rate:
b.MaxRatePerInstance = maxRPS
default:
// TODO: Set utilization and connection limits when we accept them
// as valid fields.
}
b.BalancingMode = string(bm)
}

return &compute.BackendService{
Name: name,
Protocol: string(protocol),
Backends: backends,
HealthChecks: healthCheckLinks,
Port: namedPort.Port,
PortName: namedPort.Name,
}
}

func (b *Backends) updateProtocol(bs *compute.BackendService, hcLink string, protocol utils.AppProtocol) (*compute.BackendService, error) {

return bs, nil
}

// Add will get or create a Backend for the given port.
func (b *Backends) Add(port int64) error {
func (b *Backends) Add(p ServicePort) error {
// We must track the port even if creating the backend failed, because
// we might've created a health-check for it.
be := &compute.BackendService{}
defer func() { b.snapshotter.Add(portKey(port), be) }()
defer func() { b.snapshotter.Add(portKey(p.Port), be) }()

igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port)
igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), p.Port)
if err != nil {
return err
}
be, _ = b.Get(port)

// Ensure health check for backend service exists
hcLink, err := b.ensureHealthCheck(p.Port, p.Protocol)
if err != nil {
return err
}

pName := b.namer.BeName(p.Port)
be, _ = b.Get(p.Port)
if be == nil {
glog.Infof("Creating backend for %d instance groups, port %v named port %v",
len(igs), port, namedPort)
be, err = b.create(igs, namedPort, b.namer.BeName(port))
glog.Infof("Creating backend for %d instance groups, port %v named port %v", len(igs), p.Port, namedPort)
be, err = b.create(igs, namedPort, hcLink, p.Protocol, pName)
if err != nil {
return err
}
}

existingHCLink := ""
if len(be.HealthChecks) == 1 {
existingHCLink = be.HealthChecks[0]
}

if be.Protocol != string(p.Protocol) || existingHCLink != hcLink {
glog.Infof("Updating backend protocol %v (%v) for change in protocol or health check", pName, string(p.Protocol))
be.Protocol = string(p.Protocol)
be.HealthChecks = []string{hcLink}
if err = b.cloud.UpdateBackendService(be); err != nil {
return err
}
}

// If we updated the backend service above, delete the legacy health check
if existingHCLink != hcLink && strings.Contains(existingHCLink, "/httpHealthChecks/") {
if err = b.healthChecker.DeleteLegacy(p.Port); err != nil {
return err
}
}

// we won't find any igs till the node pool syncs nodes.
if len(igs) == 0 {
return nil
}
if err := b.edgeHop(be, igs); err != nil {
if err = b.edgeHop(be, igs); err != nil {
return err
}
return err
Expand All @@ -231,7 +289,7 @@ func (b *Backends) Add(port int64) error {
// Delete deletes the Backend for the given port.
func (b *Backends) Delete(port int64) (err error) {
name := b.namer.BeName(port)
glog.Infof("Deleting backend %v", name)
glog.Infof("Deleting backend service %v", name)
defer func() {
if utils.IsHTTPErrorCode(err, http.StatusNotFound) {
err = nil
Expand All @@ -241,15 +299,11 @@ func (b *Backends) Delete(port int64) (err error) {
}
}()
// Try deleting health checks even if a backend is not found.
if err = b.cloud.DeleteBackendService(name); err != nil &&
!utils.IsHTTPErrorCode(err, http.StatusNotFound) {
if err = b.cloud.DeleteBackendService(name); err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
return err
}
if err = b.healthChecker.Delete(port); err != nil &&
!utils.IsHTTPErrorCode(err, http.StatusNotFound) {
return err
}
return nil

return b.healthChecker.Delete(port)
}

// List lists all backends.
Expand Down Expand Up @@ -306,7 +360,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
}

// Sync syncs backend services corresponding to ports in the given list.
func (b *Backends) Sync(svcNodePorts []int64) error {
func (b *Backends) Sync(svcNodePorts []ServicePort) error {
glog.V(3).Infof("Sync: backends %v", svcNodePorts)

// create backends for new ports, perform an edge hop for existing ports
Expand All @@ -319,14 +373,14 @@ func (b *Backends) Sync(svcNodePorts []int64) error {
}

// GC garbage collects services corresponding to ports in the given list.
func (b *Backends) GC(svcNodePorts []int64) error {
func (b *Backends) GC(svcNodePorts []ServicePort) error {
knownPorts := sets.NewString()
for _, port := range svcNodePorts {
knownPorts.Insert(portKey(port))
for _, p := range svcNodePorts {
knownPorts.Insert(portKey(p.Port))
}
pool := b.snapshotter.Snapshot()
for port := range pool {
p, err := strconv.Atoi(port)
p, err := strconv.ParseInt(port, 10, 64)
if err != nil {
return err
}
Expand All @@ -345,7 +399,7 @@ func (b *Backends) GC(svcNodePorts []int64) error {
// Shutdown deletes all backends and the default backend.
// This will fail if one of the backends is being used by another resource.
func (b *Backends) Shutdown() error {
if err := b.GC([]int64{}); err != nil {
if err := b.GC([]ServicePort{}); err != nil {
return err
}
return nil
Expand All @@ -365,3 +419,25 @@ func (b *Backends) Status(name string) string {
// TODO: State transition are important, not just the latest.
return hs.HealthStatus[0].HealthState
}

func applyProbeSettingsToHC(p *api_v1.Probe, hc *healthchecks.HealthCheck) {
healthPath := p.Handler.HTTPGet.Path
// GCE requires a leading "/" for health check urls.
if !strings.HasPrefix(healthPath, "/") {
healthPath = fmt.Sprintf("/%v", healthPath)
}

host := p.Handler.HTTPGet.Host
// remember the ingresses that use this Service so we can send
// the right events

hc.RequestPath = healthPath
hc.Host = host
hc.Description = "kubernetes L7 health check from readiness probe."
// set a low health threshold and a high failure threshold.
// We're just trying to detect if the node networking is
// borked, service level outages will get detected sooner
// by kube-proxy.
hc.CheckIntervalSec = int64(p.PeriodSeconds + healthchecks.DefaultHealthCheckInterval)
hc.TimeoutSec = int64(p.TimeoutSeconds)
}
Loading

0 comments on commit 06f03c4

Please sign in to comment.