diff --git a/pkg/cmd/kube-router.go b/pkg/cmd/kube-router.go index fb73c7c67..82e3e0234 100644 --- a/pkg/cmd/kube-router.go +++ b/pkg/cmd/kube-router.go @@ -72,6 +72,7 @@ func CleanupConfigAndExit() { // Run starts the controllers and waits forever till we get SIGINT or SIGTERM func (kr *KubeRouter) Run() error { var err error + var ipsetMutex sync.Mutex var wg sync.WaitGroup healthChan := make(chan *healthcheck.ControllerHeartbeat, 10) defer close(healthChan) @@ -140,7 +141,8 @@ func (kr *KubeRouter) Run() error { } if kr.Config.RunRouter { - nrc, err := routing.NewNetworkRoutingController(kr.Client, kr.Config, nodeInformer, svcInformer, epInformer) + nrc, err := routing.NewNetworkRoutingController(kr.Client, kr.Config, + nodeInformer, svcInformer, epInformer, &ipsetMutex) if err != nil { return errors.New("Failed to create network routing controller: " + err.Error()) } @@ -162,7 +164,7 @@ func (kr *KubeRouter) Run() error { if kr.Config.RunServiceProxy { nsc, err := proxy.NewNetworkServicesController(kr.Client, kr.Config, - svcInformer, epInformer, podInformer) + svcInformer, epInformer, podInformer, &ipsetMutex) if err != nil { return errors.New("Failed to create network services controller: " + err.Error()) } @@ -183,7 +185,7 @@ func (kr *KubeRouter) Run() error { if kr.Config.RunFirewall { npc, err := netpol.NewNetworkPolicyController(kr.Client, - kr.Config, podInformer, npInformer, nsInformer) + kr.Config, podInformer, npInformer, nsInformer, &ipsetMutex) if err != nil { return errors.New("Failed to create network policy controller: " + err.Error()) } diff --git a/pkg/controllers/netpol/network_policy_controller.go b/pkg/controllers/netpol/network_policy_controller.go index 8bd1a8608..d25dd7727 100644 --- a/pkg/controllers/netpol/network_policy_controller.go +++ b/pkg/controllers/netpol/network_policy_controller.go @@ -57,6 +57,7 @@ type NetworkPolicyController struct { MetricsEnabled bool healthChan chan<- *healthcheck.ControllerHeartbeat fullSyncRequestChan chan struct{} + ipsetMutex *sync.Mutex ipSetHandler *utils.IPSet @@ -566,6 +567,13 @@ func (npc *NetworkPolicyController) Cleanup() { } // delete all ipsets + klog.V(1).Infof("Attempting to attain ipset mutex lock") + npc.ipsetMutex.Lock() + klog.V(1).Infof("Attained ipset mutex lock, continuing...") + defer func() { + npc.ipsetMutex.Unlock() + klog.V(1).Infof("Returned ipset mutex lock") + }() ipset, err := utils.NewIPSet(false) if err != nil { klog.Errorf("Failed to clean up ipsets: " + err.Error()) @@ -584,8 +592,8 @@ func (npc *NetworkPolicyController) Cleanup() { // NewNetworkPolicyController returns new NetworkPolicyController object func NewNetworkPolicyController(clientset kubernetes.Interface, config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer, - npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) { - npc := NetworkPolicyController{} + npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkPolicyController, error) { + npc := NetworkPolicyController{ipsetMutex: ipsetMutex} // Creating a single-item buffered channel to ensure that we only keep a single full sync request at a time, // additional requests would be pointless to queue since after the first one was processed the system would already diff --git a/pkg/controllers/netpol/network_policy_controller_test.go b/pkg/controllers/netpol/network_policy_controller_test.go index d74d87f1a..c98dff10f 100644 --- a/pkg/controllers/netpol/network_policy_controller_test.go +++ b/pkg/controllers/netpol/network_policy_controller_test.go @@ -4,6 +4,7 @@ import ( "context" "net" "strings" + "sync" "testing" "time" @@ -522,7 +523,7 @@ func TestNetworkPolicyController(t *testing.T) { _, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client) for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - _, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer) + _, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer, &sync.Mutex{}) if err == nil && test.expectError { t.Error("This config should have failed, but it was successful instead") } else if err != nil { diff --git a/pkg/controllers/netpol/policy.go b/pkg/controllers/netpol/policy.go index ec7817e31..45987d3be 100644 --- a/pkg/controllers/netpol/policy.go +++ b/pkg/controllers/netpol/policy.go @@ -76,6 +76,14 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo klog.V(2).Infof("Syncing network policy chains took %v", endTime) }() + klog.V(1).Infof("Attempting to attain ipset mutex lock") + npc.ipsetMutex.Lock() + klog.V(1).Infof("Attained ipset mutex lock, continuing...") + defer func() { + npc.ipsetMutex.Unlock() + klog.V(1).Infof("Returned ipset mutex lock") + }() + ipset, err := utils.NewIPSet(false) if err != nil { return nil, nil, err diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index 687b44322..790336ce7 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -17,6 +17,8 @@ import ( "syscall" "time" + "golang.org/x/net/context" + "github.com/cloudnativelabs/kube-router/pkg/cri" "github.com/cloudnativelabs/kube-router/pkg/healthcheck" "github.com/cloudnativelabs/kube-router/pkg/metrics" @@ -28,7 +30,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/vishvananda/netlink" "github.com/vishvananda/netns" - "golang.org/x/net/context" api "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -219,6 +220,7 @@ type NetworkServicesController struct { ln LinuxNetworking readyForUpdates bool ProxyFirewallSetup *sync.Cond + ipsetMutex *sync.Mutex // Map of ipsets that we use. ipsetMap map[string]*utils.Set @@ -627,6 +629,13 @@ func (nsc *NetworkServicesController) cleanupIpvsFirewall() { } // Clear ipsets. + klog.V(1).Infof("Attempting to attain ipset mutex lock") + nsc.ipsetMutex.Lock() + klog.V(1).Infof("Attained ipset mutex lock, continuing...") + defer func() { + nsc.ipsetMutex.Unlock() + klog.V(1).Infof("Returned ipset mutex lock") + }() ipSetHandler, err := utils.NewIPSet(false) if err != nil { klog.Errorf("Failed to initialize ipset handler: %s", err.Error()) @@ -653,6 +662,13 @@ func (nsc *NetworkServicesController) syncIpvsFirewall() error { - update ipsets based on currently active IPVS services */ var err error + klog.V(1).Infof("Attempting to attain ipset mutex lock") + nsc.ipsetMutex.Lock() + klog.V(1).Infof("Attained ipset mutex lock, continuing...") + defer func() { + nsc.ipsetMutex.Unlock() + klog.V(1).Infof("Returned ipset mutex lock") + }() localIPsIPSet := nsc.ipsetMap[localIPsIPSetName] @@ -2507,7 +2523,7 @@ func (nsc *NetworkServicesController) handleServiceDelete(obj interface{}) { // NewNetworkServicesController returns NetworkServicesController object func NewNetworkServicesController(clientset kubernetes.Interface, config *options.KubeRouterConfig, svcInformer cache.SharedIndexInformer, - epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer) (*NetworkServicesController, error) { + epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkServicesController, error) { var err error ln, err := newLinuxNetworking() @@ -2515,7 +2531,7 @@ func NewNetworkServicesController(clientset kubernetes.Interface, return nil, err } - nsc := NetworkServicesController{ln: ln} + nsc := NetworkServicesController{ln: ln, ipsetMutex: ipsetMutex} if config.MetricsEnabled { //Register the metrics for this controller diff --git a/pkg/controllers/routing/network_routes_controller.go b/pkg/controllers/routing/network_routes_controller.go index 5d70fd0ca..dd10bf743 100644 --- a/pkg/controllers/routing/network_routes_controller.go +++ b/pkg/controllers/routing/network_routes_controller.go @@ -116,6 +116,7 @@ type NetworkRoutingController struct { overrideNextHop bool podCidr string CNIFirewallSetup *sync.Cond + ipsetMutex *sync.Mutex nodeLister cache.Indexer svcLister cache.Indexer @@ -640,6 +641,13 @@ func (nrc *NetworkRoutingController) Cleanup() { } // delete all ipsets created by kube-router + klog.V(1).Infof("Attempting to attain ipset mutex lock") + nrc.ipsetMutex.Lock() + klog.V(1).Infof("Attained ipset mutex lock, continuing...") + defer func() { + nrc.ipsetMutex.Unlock() + klog.V(1).Infof("Returned ipset mutex lock") + }() ipset, err := utils.NewIPSet(nrc.isIpv6) if err != nil { klog.Errorf("Failed to clean up ipsets: " + err.Error()) @@ -662,6 +670,13 @@ func (nrc *NetworkRoutingController) syncNodeIPSets() error { metrics.ControllerRoutesSyncTime.Observe(time.Since(start).Seconds()) } }() + klog.V(1).Infof("Attempting to attain ipset mutex lock") + nrc.ipsetMutex.Lock() + klog.V(1).Infof("Attained ipset mutex lock, continuing...") + defer func() { + nrc.ipsetMutex.Unlock() + klog.V(1).Infof("Returned ipset mutex lock") + }() nodes := nrc.nodeLister.List() @@ -983,11 +998,11 @@ func (nrc *NetworkRoutingController) startBgpServer(grpcServer bool) error { func NewNetworkRoutingController(clientset kubernetes.Interface, kubeRouterConfig *options.KubeRouterConfig, nodeInformer cache.SharedIndexInformer, svcInformer cache.SharedIndexInformer, - epInformer cache.SharedIndexInformer) (*NetworkRoutingController, error) { + epInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkRoutingController, error) { var err error - nrc := NetworkRoutingController{} + nrc := NetworkRoutingController{ipsetMutex: ipsetMutex} if kubeRouterConfig.MetricsEnabled { //Register the metrics for this controller prometheus.MustRegister(metrics.ControllerBGPadvertisementsReceived)