Skip to content

Commit

Permalink
controller: - Replace panics with errors
Browse files Browse the repository at this point in the history
            - Add context to errors for debugging
            - Refactor init() code so ipset isn't required to run
              "kube-router --help" for example
  • Loading branch information
bzub committed Jul 6, 2017
1 parent a757ea3 commit cb661f8
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 65 deletions.
52 changes: 30 additions & 22 deletions app/controllers/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro

if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
glog.Infof("Performing periodic syn of the iptables to reflect network policies")
npc.Sync()
err := npc.Sync()
if err != nil {
glog.Errorf("Error during periodic sync: ", err)
}
} else {
continue
}
Expand All @@ -109,7 +112,10 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro
func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
glog.Infof("Received pod update namspace:%s pod name:%s", podUpdate.Pod.Namespace, podUpdate.Pod.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
npc.Sync()
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on pod update: ", err)
}
} else {
glog.Infof("Received pod update, but controller not in sync")
}
Expand All @@ -118,7 +124,10 @@ func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) {
glog.Infof("Received network policy update namspace:%s policy name:%s", networkPolicyUpdate.NetworkPolicy.Namespace, networkPolicyUpdate.NetworkPolicy.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
npc.Sync()
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on network policy update: ", err)
}
} else {
glog.Infof("Received network policy update, but controller not in sync")
}
Expand All @@ -127,47 +136,53 @@ func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *w
func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) {
glog.Infof("Received namesapce update namspace:%s", namespaceUpdate.Namespace.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
npc.Sync()
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on namespace update: ", err)
}
} else {
glog.Infof("Received namspace update, but controller not in sync")
}
}

// Sync synchronizes iptables to desired state of network policies
func (npc *NetworkPolicyController) Sync() {
func (npc *NetworkPolicyController) Sync() error {

var err error
npc.mu.Lock()
defer npc.mu.Unlock()

_, err = exec.LookPath("ipset")
if err != nil {
return errors.New("Ensure ipset package is installed: " + err.Error())
}

start := time.Now()
defer func() {
glog.Infof("sync iptables took %v", time.Since(start))
}()

npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
if err != nil {
glog.Errorf("Aborting sync. Failed to build network policies: %s", err.Error())
return
return errors.New("Aborting sync. Failed to build network policies: %s" + err.Error())
}

activePolicyChains, err := npc.syncNetworkPolicyChains()
if err != nil {
glog.Errorf("Aborting sync. Failed to sync network policy chains: %s", err.Error())
return
return errors.New("Aborting sync. Failed to sync network policy chains: %s" + err.Error())
}

activePodFwChains, err := npc.syncPodFirewallChains()
if err != nil {
glog.Errorf("Aborting sync. Failed to sync pod firewalls: %s", err.Error())
return
return errors.New("Aborting sync. Failed to sync pod firewalls: %s" + err.Error())
}

err = cleanupStaleRules(activePolicyChains, activePodFwChains)
if err != nil {
glog.Errorf("Aborting sync. Failed to cleanup stale iptable rules: %s", err.Error())
return
return errors.New("Aborting sync. Failed to cleanup stale iptable rules: %s" + err.Error())
}

return nil
}

// Configure iptable rules representing each network policy. All pod's matched by
Expand Down Expand Up @@ -763,13 +778,6 @@ func (npc *NetworkPolicyController) Cleanup() {
glog.Infof("Successfully cleaned the iptables configuration done by kube-router")
}

func init() {
_, err := exec.LookPath("ipset")
if err != nil {
panic("ipset command not found ensure ipset package is installed")
}
}

func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*NetworkPolicyController, error) {

npc := NetworkPolicyController{}
Expand All @@ -778,14 +786,14 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options

node, err := utils.GetNodeObject(clientset, config.HostnameOverride)
if err != nil {
panic(err.Error())
return nil, err
}

npc.nodeHostName = node.Name

nodeIP, err := getNodeIP(node)
if err != nil {
panic(err.Error())
return nil, err
}
npc.nodeIP = nodeIP

Expand Down
55 changes: 28 additions & 27 deletions app/controllers/network_routes_controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controllers

import (
"errors"
"fmt"
"net"
"strconv"
Expand Down Expand Up @@ -85,15 +86,17 @@ func (nrc *NetworkRoutingController) Run(stopCh <-chan struct{}, wg *sync.WaitGr

// Wait till we are ready to launch BGP server
for {
ok := nrc.startBgpServer()
if !ok {
err := nrc.startBgpServer()
if err != nil {
glog.Errorf("Failed to start node BGP server: %s", err)
select {
case <-stopCh:
glog.Infof("Shutting down network routes controller")
return
case <-t.C:
glog.Infof("Retrying start of node BGP server")
continue
}
continue
} else {
break
}
Expand Down Expand Up @@ -348,27 +351,25 @@ func (nrc *NetworkRoutingController) OnNodeUpdate(nodeUpdate *watchers.NodeUpdat
}
}

func (nrc *NetworkRoutingController) startBgpServer() bool {
func (nrc *NetworkRoutingController) startBgpServer() error {

var nodeAsnNumber uint32
node, err := utils.GetNodeObject(nrc.clientset, nrc.hostnameOverride)
if err != nil {
panic("Failed to get node object from api server due to " + err.Error())
return errors.New("Failed to get node object from api server: " + err.Error())
}

if nrc.bgpFullMeshMode {
nodeAsnNumber = nrc.defaultNodeAsnNumber
} else {
nodeasn, ok := node.ObjectMeta.Annotations["net.kuberouter.nodeasn"]
if !ok {
glog.Infof("Could not find ASN number for the node. Node need to be annotated with ASN number details to start BGP server.")
return false
return errors.New("Could not find ASN number for the node. Node need to be annotated with ASN number details to start BGP server.")
} else {
glog.Infof("Found ASN for the node to be %s from the node annotations", nodeasn)
asnNo, err := strconv.ParseUint(nodeasn, 0, 32)
if err != nil {
glog.Errorf("Failed to parse ASN number specified for the the node")
return false
return errors.New("Failed to parse ASN number specified for the the node")
}
nodeAsnNumber = uint32(asnNo)
}
Expand All @@ -389,7 +390,7 @@ func (nrc *NetworkRoutingController) startBgpServer() bool {
}

if err := nrc.bgpServer.Start(global); err != nil {
panic("Failed to start BGP server due to : " + err.Error())
return errors.New("Failed to start BGP server due to : " + err.Error())
}

go nrc.watchBgpUpdates()
Expand All @@ -405,38 +406,38 @@ func (nrc *NetworkRoutingController) startBgpServer() bool {
},
}
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
panic("Failed to peer with global peer router due to: " + peer)
return errors.New("Failed to peer with global peer router \"" + peer + "\" due to: " + err.Error())
}
}
} else {
nodeBgpPeerAsn, ok := node.ObjectMeta.Annotations["net.kuberouter.node.bgppeer.asn"]
if !ok {
glog.Infof("Could not find BGP peer info for the node in the node annotations so skipping configuring peer.")
return true
return nil
}
asnNo, err := strconv.ParseUint(nodeBgpPeerAsn, 0, 32)
if err != nil {
panic("Failed to parse ASN number specified for the the node in the annotations")
return errors.New("Failed to parse ASN number specified for the the node in the annotations")
}
peerAsnNo := uint32(asnNo)

nodeBgpPeersAnnotation, ok := node.ObjectMeta.Annotations["net.kuberouter.node.bgppeer.address"]
if !ok {
glog.Infof("Could not find BGP peer info for the node in the node annotations so skipping configuring peer.")
return true
return nil
}
nodePeerRouters := make([]string, 0)
if strings.Contains(nodeBgpPeersAnnotation, ",") {
ips := strings.Split(nodeBgpPeersAnnotation, ",")
for _, ip := range ips {
if net.ParseIP(ip) == nil {
panic("Invalid node BGP peer router ip in the annotation: " + ip)
return errors.New("Invalid node BGP peer router ip in the annotation: " + ip)
}
}
nodePeerRouters = append(nodePeerRouters, ips...)
} else {
if net.ParseIP(nodeBgpPeersAnnotation) == nil {
panic("Invalid node BGP peer router ip: " + nodeBgpPeersAnnotation)
return errors.New("Invalid node BGP peer router ip: " + nodeBgpPeersAnnotation)
}
nodePeerRouters = append(nodePeerRouters, nodeBgpPeersAnnotation)
}
Expand All @@ -449,14 +450,14 @@ func (nrc *NetworkRoutingController) startBgpServer() bool {
},
}
if err := nrc.bgpServer.AddNeighbor(n); err != nil {
panic("Failed to peer with node specific BGP peer router: " + peer + " due to " + err.Error())
return errors.New("Failed to peer with node specific BGP peer router: " + peer + " due to " + err.Error())
}
}

glog.Infof("Successfully configured %s in ASN %v as BGP peer to the node", nodeBgpPeersAnnotation, peerAsnNo)
}

return true
return nil
}

func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConfig *options.KubeRouterConfig) (*NetworkRoutingController, error) {
Expand All @@ -471,10 +472,10 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf
if len(kubeRouterConfig.ClusterAsn) != 0 {
asn, err := strconv.ParseUint(kubeRouterConfig.ClusterAsn, 0, 32)
if err != nil {
panic("Invalid cluster ASN: " + err.Error())
return nil, errors.New("Invalid cluster ASN: " + err.Error())
}
if asn > 65534 || asn < 64512 {
panic("Invalid ASN number for cluster ASN")
return nil, errors.New("Invalid ASN number for cluster ASN")
}
nrc.defaultNodeAsnNumber = uint32(asn)
} else {
Expand All @@ -485,7 +486,7 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf

if (len(kubeRouterConfig.PeerRouter) != 0 && len(kubeRouterConfig.PeerAsn) == 0) ||
(len(kubeRouterConfig.PeerRouter) == 0 && len(kubeRouterConfig.PeerAsn) != 0) {
panic("Either both or none of the params --peer-asn, --peer-router must be specified")
return nil, errors.New("Either both or none of the params --peer-asn, --peer-router must be specified")
}

if len(kubeRouterConfig.PeerRouter) != 0 && len(kubeRouterConfig.PeerAsn) != 0 {
Expand All @@ -494,39 +495,39 @@ func NewNetworkRoutingController(clientset *kubernetes.Clientset, kubeRouterConf
ips := strings.Split(kubeRouterConfig.PeerRouter, ",")
for _, ip := range ips {
if net.ParseIP(ip) == nil {
panic("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter)
return nil, errors.New("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter)
}
}
nrc.globalPeerRouters = append(nrc.globalPeerRouters, ips...)

} else {
if net.ParseIP(kubeRouterConfig.PeerRouter) == nil {
panic("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter)
return nil, errors.New("Invalid global BGP peer router ip: " + kubeRouterConfig.PeerRouter)
}
nrc.globalPeerRouters = append(nrc.globalPeerRouters, kubeRouterConfig.PeerRouter)
}

asn, err := strconv.ParseUint(kubeRouterConfig.PeerAsn, 0, 32)
if err != nil {
panic("Invalid global BGP peer ASN: " + err.Error())
return nil, errors.New("Invalid global BGP peer ASN: " + err.Error())
}
if asn > 65534 {
panic("Invalid ASN number for global BGP peer")
return nil, errors.New("Invalid ASN number for global BGP peer")
}
nrc.globalPeerAsnNumber = uint32(asn)
}

nrc.hostnameOverride = kubeRouterConfig.HostnameOverride
node, err := utils.GetNodeObject(clientset, nrc.hostnameOverride)
if err != nil {
panic(err.Error())
return nil, errors.New("Failed getting node object from API server: " + err.Error())
}

nrc.nodeHostName = node.Name

nodeIP, err := getNodeIP(node)
if err != nil {
panic(err.Error())
return nil, errors.New("Failed getting IP address from node object: " + err.Error())
}
nrc.nodeIP = nodeIP

Expand Down
Loading

0 comments on commit cb661f8

Please sign in to comment.