Skip to content

Commit

Permalink
Merge pull request #166 from nginxinc/watch-secrets
Browse files Browse the repository at this point in the history
Add watching for secret resource updates
  • Loading branch information
pleshakov committed Aug 9, 2017
2 parents e7452bb + 088b677 commit e892dd1
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 60 deletions.
161 changes: 148 additions & 13 deletions nginx-controller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ type LoadBalancerController struct {
svcController cache.Controller
endpController cache.Controller
cfgmController cache.Controller
secrController cache.Controller
ingLister StoreToIngressLister
svcLister cache.Store
endpLister StoreToEndpointLister
cfgmLister StoreToConfigMapLister
secrLister StoreToSecretLister
syncQueue *taskQueue
stopCh chan struct{}
cnf *nginx.Configurator
Expand Down Expand Up @@ -200,8 +202,49 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, resyncPeriod tim
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "endpoints", namespace, fields.Everything()),
&api_v1.Endpoints{}, resyncPeriod, endpHandlers)

secrHandlers := cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
remSecr, isSecr := obj.(*api_v1.Secret)
if !isSecr {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(3).Infof("Error received unexpected object: %v", obj)
return
}
remSecr, ok = deletedState.Obj.(*api_v1.Secret)
if !ok {
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Secret object: %v", deletedState.Obj)
return
}
}
if err := ValidateTLSSecret(remSecr); err != nil {
return
}

glog.V(3).Infof("Removing Secret: %v", remSecr.Name)
lbc.syncQueue.enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
errOld := ValidateTLSSecret(old.(*api_v1.Secret))
errCur := ValidateTLSSecret(cur.(*api_v1.Secret))
if errOld != nil && errCur != nil {
return
}

if !reflect.DeepEqual(old, cur) {
glog.V(3).Infof("Secret %v changed, syncing",
cur.(*api_v1.Secret).Name)
lbc.syncQueue.enqueue(cur)
}
},
}

lbc.secrLister.Store, lbc.secrController = cache.NewInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "secrets", namespace, fields.Everything()),
&api_v1.Secret{}, resyncPeriod, secrHandlers)

if nginxConfigMaps != "" {
nginxConfigMapsNS, nginxConfigMapsName, err := parseNginxConfigMaps(nginxConfigMaps)
nginxConfigMapsNS, nginxConfigMapsName, err := parseNamespaceName(nginxConfigMaps)
if err != nil {
glog.Warning(err)
} else {
Expand Down Expand Up @@ -259,6 +302,7 @@ func (lbc *LoadBalancerController) Run() {
go lbc.ingController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh)
go lbc.endpController.Run(lbc.stopCh)
go lbc.secrController.Run(lbc.stopCh)
go lbc.syncQueue.run(time.Second, lbc.stopCh)
if lbc.watchNginxConfigMaps {
go lbc.cfgmController.Run(lbc.stopCh)
Expand Down Expand Up @@ -296,8 +340,7 @@ func (lbc *LoadBalancerController) syncEndp(task Task) {
continue
}
glog.V(3).Infof("Updating Endpoints for %v/%v", ing.Name, ing.Namespace)
name := ing.Namespace + "-" + ing.Name
lbc.cnf.UpdateEndpoints(name, ingEx)
lbc.cnf.UpdateEndpoints(ingEx)
if err != nil {
glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err)
}
Expand Down Expand Up @@ -507,7 +550,7 @@ func (lbc *LoadBalancerController) syncCfgm(task Task) {

var ingExes []*nginx.IngressEx
ings, _ := lbc.ingLister.List()
for i, _ := range ings.Items {
for i := range ings.Items {
if !isNginxIngress(&ings.Items[i]) {
continue
}
Expand Down Expand Up @@ -551,6 +594,8 @@ func (lbc *LoadBalancerController) sync(task Task) {
case Endpoints:
lbc.syncEndp(task)
return
case Secret:
lbc.syncSecret(task)
}
}

Expand All @@ -562,14 +607,11 @@ func (lbc *LoadBalancerController) syncIng(task Task) {
return
}

// defaut/some-ingress -> default-some-ingress
name := strings.Replace(key, "/", "-", -1)

if !ingExists {
glog.V(2).Infof("Deleting Ingress: %v\n", key)
err := lbc.cnf.DeleteIngress(name)
err := lbc.cnf.DeleteIngress(key)
if err != nil {
glog.Errorf("Error when deleting configuration for %v: %v", name, err)
glog.Errorf("Error when deleting configuration for %v: %v", key, err)
}
} else {
glog.V(2).Infof("Adding or Updating Ingress: %v\n", key)
Expand All @@ -581,7 +623,7 @@ func (lbc *LoadBalancerController) syncIng(task Task) {
lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "Rejected", "%v was rejected: %v", key, err)
return
}
err = lbc.cnf.AddOrUpdateIngress(name, ingEx)
err = lbc.cnf.AddOrUpdateIngress(ingEx)
if err != nil {
lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "AddedOrUpdatedWithError", "Configuration for %v was added or updated, but not applied: %v", key, err)
} else {
Expand All @@ -590,6 +632,95 @@ func (lbc *LoadBalancerController) syncIng(task Task) {
}
}

func (lbc *LoadBalancerController) syncSecret(task Task) {
key := task.Key
obj, secrExists, err := lbc.secrLister.Store.GetByKey(key)
if err != nil {
lbc.syncQueue.requeue(task, err)
return
}

_, name, err := parseNamespaceName(key)
if err != nil {
glog.Warningf("Secret key %v is invalid: %v", key, err)
return
}

ings, err := lbc.findIngressesForSecret(name)
if err != nil {
glog.Warningf("Failed to find Ingress resources for Secret %v: %v", key, err)
lbc.syncQueue.requeueAfter(task, err, 5*time.Second)
}

glog.V(2).Infof("Found %v Ingress resources with Secret %v", len(ings), key)

if !secrExists {
glog.V(2).Infof("Deleting Secret: %v\n", key)

if err := lbc.cnf.DeleteTLSSecret(key, ings); err != nil {
glog.Errorf("Error when deleting Secret: %v: %v", key, err)
}

for _, ing := range ings {
lbc.syncQueue.enqueue(&ing)
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "Rejected", "%v/%v was rejected due to deleted Secret %v: %v", ing.Namespace, ing.Name, key)
}
} else {
glog.V(2).Infof("Updating Secret: %v\n", key)

secret := obj.(*api_v1.Secret)

if len(ings) > 0 {
err := ValidateTLSSecret(secret)
if err != nil {
glog.Errorf("Couldn't validate secret %v: %v", key, err)
if err := lbc.cnf.DeleteTLSSecret(key, ings); err != nil {
glog.Errorf("Error when deleting Secret: %v: %v", key, err)
}
for _, ing := range ings {
lbc.syncQueue.enqueue(&ing)
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "Rejected", "%v/%v was rejected due to invalid Secret %v: %v", ing.Namespace, ing.Name, key, err)
}
lbc.recorder.Eventf(secret, api_v1.EventTypeWarning, "Rejected", "%v was rejected: %v", key, err)
return
}

if err := lbc.cnf.AddOrUpdateTLSSecret(secret, true); err != nil {
glog.Errorf("Error when updating Secret %v: %v", key, err)
lbc.recorder.Eventf(secret, api_v1.EventTypeWarning, "UpdatedWithError", "%v was updated, but not applied: %v", key, err)
for _, ing := range ings {
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "UpdatedWithError", "Configuration for %v/%v was updated, but not applied: %v", ing.Namespace, ing.Name, err)
}
} else {
lbc.recorder.Eventf(secret, api_v1.EventTypeNormal, "Updated", "%v was updated", key)
for _, ing := range ings {
lbc.recorder.Eventf(&ing, api_v1.EventTypeNormal, "Updated", "Configuration for %v/%v was updated", ing.Namespace, ing.Name)
}
}
}
}
}

func (lbc *LoadBalancerController) findIngressesForSecret(secret string) ([]extensions.Ingress, error) {
res := []extensions.Ingress{}
ings, err := lbc.ingLister.List()
if err != nil {
return nil, fmt.Errorf("Couldn't get the list of Ingress resources: %v", err)
}
for _, ing := range ings.Items {
if !isNginxIngress(&ing) {
continue
}
for _, tls := range ing.Spec.TLS {
if tls.SecretName == secret {
res = append(res, ing)
}
}
}

return res, nil
}

func (lbc *LoadBalancerController) enqueueIngressForService(svc *api_v1.Service) {
ings := lbc.getIngressesForService(svc)
for _, ing := range ings {
Expand Down Expand Up @@ -636,6 +767,10 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) (*ngin
if err != nil {
return nil, fmt.Errorf("Error retrieving secret %v for Ingress %v: %v", secretName, ing.Name, err)
}
err = ValidateTLSSecret(secret)
if err != nil {
return nil, fmt.Errorf("Error validating secret %v for Ingress %v: %v", secretName, ing.Name, err)
}
ingEx.Secrets[secretName] = secret
}

Expand Down Expand Up @@ -768,10 +903,10 @@ func (lbc *LoadBalancerController) getServiceForIngressBackend(backend *extensio
return nil, fmt.Errorf("service %s doesn't exists", svcKey)
}

func parseNginxConfigMaps(nginxConfigMaps string) (string, string, error) {
res := strings.Split(nginxConfigMaps, "/")
func parseNamespaceName(value string) (ns string, name string, err error) {
res := strings.Split(value, "/")
if len(res) != 2 {
return "", "", fmt.Errorf("NGINX configmaps name must follow the format <namespace>/<name>, got: %v", nginxConfigMaps)
return "", "", fmt.Errorf("%v must follow the format <namespace>/<name>", value)
}
return res[0], res[1], nil
}
Expand Down
20 changes: 20 additions & 0 deletions nginx-controller/controller/secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package controller

import (
"fmt"

api_v1 "k8s.io/client-go/pkg/api/v1"
)

// ValidateTLSSecret validates the secret. If it is valid, the function returns nil.
func ValidateTLSSecret(secret *api_v1.Secret) error {
if _, exists := secret.Data[api_v1.TLSCertKey]; !exists {
return fmt.Errorf("Secret doesn't have %v", api_v1.TLSCertKey)
}

if _, exists := secret.Data[api_v1.TLSPrivateKeyKey]; !exists {
return fmt.Errorf("Secret doesn't have %v", api_v1.TLSPrivateKeyKey)
}

return nil
}
4 changes: 4 additions & 0 deletions nginx-controller/controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ const (
Endpoints
// ConfigMap resource
ConfigMap
// Secret resource
Secret
)

// Task is an element of a taskQueue
Expand All @@ -136,6 +138,8 @@ func NewTask(key string, obj interface{}) (Task, error) {
k = Endpoints
case *api_v1.ConfigMap:
k = ConfigMap
case *api_v1.Secret:
k = Secret
default:
return Task{}, fmt.Errorf("Unknow type: %v", t)
}
Expand Down
Loading

0 comments on commit e892dd1

Please sign in to comment.