Skip to content

Commit

Permalink
fix: add locking around ipset invocations
Browse files Browse the repository at this point in the history
  • Loading branch information
aauren committed May 25, 2021
1 parent cb3a91a commit fdfa27d
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 11 deletions.
8 changes: 5 additions & 3 deletions pkg/cmd/kube-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func CleanupConfigAndExit() {
// Run starts the controllers and waits forever till we get SIGINT or SIGTERM
func (kr *KubeRouter) Run() error {
var err error
var ipsetMutex sync.Mutex
var wg sync.WaitGroup
healthChan := make(chan *healthcheck.ControllerHeartbeat, 10)
defer close(healthChan)
Expand Down Expand Up @@ -140,7 +141,8 @@ func (kr *KubeRouter) Run() error {
}

if kr.Config.RunRouter {
nrc, err := routing.NewNetworkRoutingController(kr.Client, kr.Config, nodeInformer, svcInformer, epInformer)
nrc, err := routing.NewNetworkRoutingController(kr.Client, kr.Config,
nodeInformer, svcInformer, epInformer, &ipsetMutex)
if err != nil {
return errors.New("Failed to create network routing controller: " + err.Error())
}
Expand All @@ -162,7 +164,7 @@ func (kr *KubeRouter) Run() error {

if kr.Config.RunServiceProxy {
nsc, err := proxy.NewNetworkServicesController(kr.Client, kr.Config,
svcInformer, epInformer, podInformer)
svcInformer, epInformer, podInformer, &ipsetMutex)
if err != nil {
return errors.New("Failed to create network services controller: " + err.Error())
}
Expand All @@ -183,7 +185,7 @@ func (kr *KubeRouter) Run() error {

if kr.Config.RunFirewall {
npc, err := netpol.NewNetworkPolicyController(kr.Client,
kr.Config, podInformer, npInformer, nsInformer)
kr.Config, podInformer, npInformer, nsInformer, &ipsetMutex)
if err != nil {
return errors.New("Failed to create network policy controller: " + err.Error())
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/controllers/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type NetworkPolicyController struct {
MetricsEnabled bool
healthChan chan<- *healthcheck.ControllerHeartbeat
fullSyncRequestChan chan struct{}
ipsetMutex *sync.Mutex

ipSetHandler *utils.IPSet

Expand Down Expand Up @@ -566,6 +567,13 @@ func (npc *NetworkPolicyController) Cleanup() {
}

// delete all ipsets
klog.V(1).Infof("Attempting to attain ipset mutex lock")
npc.ipsetMutex.Lock()
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
defer func() {
npc.ipsetMutex.Unlock()
klog.V(1).Infof("Returned ipset mutex lock")
}()
ipset, err := utils.NewIPSet(false)
if err != nil {
klog.Errorf("Failed to clean up ipsets: " + err.Error())
Expand All @@ -584,8 +592,8 @@ func (npc *NetworkPolicyController) Cleanup() {
// NewNetworkPolicyController returns new NetworkPolicyController object
func NewNetworkPolicyController(clientset kubernetes.Interface,
config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer,
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{}
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{ipsetMutex: ipsetMutex}

// Creating a single-item buffered channel to ensure that we only keep a single full sync request at a time,
// additional requests would be pointless to queue since after the first one was processed the system would already
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/netpol/network_policy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -522,7 +523,7 @@ func TestNetworkPolicyController(t *testing.T) {
_, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client)
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
_, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer)
_, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer, &sync.Mutex{})
if err == nil && test.expectError {
t.Error("This config should have failed, but it was successful instead")
} else if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/netpol/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
klog.V(2).Infof("Syncing network policy chains took %v", endTime)
}()

klog.V(1).Infof("Attempting to attain ipset mutex lock")
npc.ipsetMutex.Lock()
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
defer func() {
npc.ipsetMutex.Unlock()
klog.V(1).Infof("Returned ipset mutex lock")
}()

ipset, err := utils.NewIPSet(false)
if err != nil {
return nil, nil, err
Expand Down
22 changes: 19 additions & 3 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"syscall"
"time"

"golang.org/x/net/context"

"github.com/cloudnativelabs/kube-router/pkg/cri"
"github.com/cloudnativelabs/kube-router/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/pkg/metrics"
Expand All @@ -28,7 +30,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
"golang.org/x/net/context"
api "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -219,6 +220,7 @@ type NetworkServicesController struct {
ln LinuxNetworking
readyForUpdates bool
ProxyFirewallSetup *sync.Cond
ipsetMutex *sync.Mutex

// Map of ipsets that we use.
ipsetMap map[string]*utils.Set
Expand Down Expand Up @@ -627,6 +629,13 @@ func (nsc *NetworkServicesController) cleanupIpvsFirewall() {
}

// Clear ipsets.
klog.V(1).Infof("Attempting to attain ipset mutex lock")
nsc.ipsetMutex.Lock()
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
defer func() {
nsc.ipsetMutex.Unlock()
klog.V(1).Infof("Returned ipset mutex lock")
}()
ipSetHandler, err := utils.NewIPSet(false)
if err != nil {
klog.Errorf("Failed to initialize ipset handler: %s", err.Error())
Expand All @@ -653,6 +662,13 @@ func (nsc *NetworkServicesController) syncIpvsFirewall() error {
- update ipsets based on currently active IPVS services
*/
var err error
klog.V(1).Infof("Attempting to attain ipset mutex lock")
nsc.ipsetMutex.Lock()
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
defer func() {
nsc.ipsetMutex.Unlock()
klog.V(1).Infof("Returned ipset mutex lock")
}()

localIPsIPSet := nsc.ipsetMap[localIPsIPSetName]

Expand Down Expand Up @@ -2507,15 +2523,15 @@ func (nsc *NetworkServicesController) handleServiceDelete(obj interface{}) {
// NewNetworkServicesController returns NetworkServicesController object
func NewNetworkServicesController(clientset kubernetes.Interface,
config *options.KubeRouterConfig, svcInformer cache.SharedIndexInformer,
epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer) (*NetworkServicesController, error) {
epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkServicesController, error) {

var err error
ln, err := newLinuxNetworking()
if err != nil {
return nil, err
}

nsc := NetworkServicesController{ln: ln}
nsc := NetworkServicesController{ln: ln, ipsetMutex: ipsetMutex}

if config.MetricsEnabled {
//Register the metrics for this controller
Expand Down
19 changes: 17 additions & 2 deletions pkg/controllers/routing/network_routes_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type NetworkRoutingController struct {
overrideNextHop bool
podCidr string
CNIFirewallSetup *sync.Cond
ipsetMutex *sync.Mutex

nodeLister cache.Indexer
svcLister cache.Indexer
Expand Down Expand Up @@ -640,6 +641,13 @@ func (nrc *NetworkRoutingController) Cleanup() {
}

// delete all ipsets created by kube-router
klog.V(1).Infof("Attempting to attain ipset mutex lock")
nrc.ipsetMutex.Lock()
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
defer func() {
nrc.ipsetMutex.Unlock()
klog.V(1).Infof("Returned ipset mutex lock")
}()
ipset, err := utils.NewIPSet(nrc.isIpv6)
if err != nil {
klog.Errorf("Failed to clean up ipsets: " + err.Error())
Expand All @@ -662,6 +670,13 @@ func (nrc *NetworkRoutingController) syncNodeIPSets() error {
metrics.ControllerRoutesSyncTime.Observe(time.Since(start).Seconds())
}
}()
klog.V(1).Infof("Attempting to attain ipset mutex lock")
nrc.ipsetMutex.Lock()
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
defer func() {
nrc.ipsetMutex.Unlock()
klog.V(1).Infof("Returned ipset mutex lock")
}()

nodes := nrc.nodeLister.List()

Expand Down Expand Up @@ -983,11 +998,11 @@ func (nrc *NetworkRoutingController) startBgpServer(grpcServer bool) error {
func NewNetworkRoutingController(clientset kubernetes.Interface,
kubeRouterConfig *options.KubeRouterConfig,
nodeInformer cache.SharedIndexInformer, svcInformer cache.SharedIndexInformer,
epInformer cache.SharedIndexInformer) (*NetworkRoutingController, error) {
epInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkRoutingController, error) {

var err error

nrc := NetworkRoutingController{}
nrc := NetworkRoutingController{ipsetMutex: ipsetMutex}
if kubeRouterConfig.MetricsEnabled {
//Register the metrics for this controller
prometheus.MustRegister(metrics.ControllerBGPadvertisementsReceived)
Expand Down

0 comments on commit fdfa27d

Please sign in to comment.