diff --git a/.changelog/12553.txt b/.changelog/12553.txt new file mode 100644 index 000000000000..a0da82808510 --- /dev/null +++ b/.changelog/12553.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed bug where accessing plugins was subject to a data race +``` diff --git a/client/pluginmanager/csimanager/manager.go b/client/pluginmanager/csimanager/manager.go index 11dc66b4c19e..96e26ac01018 100644 --- a/client/pluginmanager/csimanager/manager.go +++ b/client/pluginmanager/csimanager/manager.go @@ -54,9 +54,10 @@ func New(config *Config) Manager { } type csiManager struct { - // instances should only be accessed from the run() goroutine and the shutdown - // fn. It is a map of PluginType : [PluginName : instanceManager] - instances map[string]map[string]*instanceManager + // instances should only be accessed after locking with instancesLock. + // It is a map of PluginType : [PluginName : *instanceManager] + instances map[string]map[string]*instanceManager + instancesLock sync.RWMutex registry dynamicplugins.Registry logger hclog.Logger @@ -75,6 +76,8 @@ func (c *csiManager) PluginManager() pluginmanager.PluginManager { } func (c *csiManager) MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error) { + c.instancesLock.RLock() + defer c.instancesLock.RUnlock() nodePlugins, hasAnyNodePlugins := c.instances["csi-node"] if !hasAnyNodePlugins { return nil, fmt.Errorf("no storage node plugins found") @@ -118,6 +121,10 @@ func (c *csiManager) runLoop() { // managers against those in the registry. we primarily will use update // events from the registry. func (c *csiManager) resyncPluginsFromRegistry(ptype string) { + + c.instancesLock.Lock() + defer c.instancesLock.Unlock() + plugins := c.registry.ListPlugins(ptype) seen := make(map[string]struct{}, len(plugins)) @@ -150,6 +157,9 @@ func (c *csiManager) handlePluginEvent(event *dynamicplugins.PluginUpdateEvent) "plugin_id", event.Info.Name, "plugin_alloc_id", event.Info.AllocID) + c.instancesLock.Lock() + defer c.instancesLock.Unlock() + switch event.EventType { case dynamicplugins.EventTypeRegistered: c.ensureInstance(event.Info) @@ -163,6 +173,7 @@ func (c *csiManager) handlePluginEvent(event *dynamicplugins.PluginUpdateEvent) // Ensure we have an instance manager for the plugin and add it to // the CSI manager's tracking table for that plugin type. +// Assumes that c.instances has been locked. func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) { name := plugin.Name ptype := plugin.Type @@ -177,6 +188,7 @@ func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) { // Shut down the instance manager for a plugin and remove it from // the CSI manager's tracking table for that plugin type. +// Assumes that c.instances has been locked. func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) { name := plugin.Name ptype := plugin.Type @@ -190,6 +202,7 @@ func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) { // Get the instance managers table for a specific plugin type, // ensuring it's been initialized if it doesn't exist. +// Assumes that c.instances has been locked. func (c *csiManager) instancesForType(ptype string) map[string]*instanceManager { pluginMap, ok := c.instances[ptype] if !ok { diff --git a/client/pluginmanager/csimanager/manager_test.go b/client/pluginmanager/csimanager/manager_test.go index f6c3f381decd..9d37756a2c2e 100644 --- a/client/pluginmanager/csimanager/manager_test.go +++ b/client/pluginmanager/csimanager/manager_test.go @@ -65,13 +65,8 @@ func TestManager_RegisterPlugin(t *testing.T) { pm.Run() require.Eventually(t, func() bool { - pmap, ok := pm.instances[fakePlugin.Type] - if !ok { - return false - } - - _, ok = pmap[fakePlugin.Name] - return ok + im := instanceManagerByTypeAndName(pm, fakePlugin.Type, fakePlugin.Name) + return im != nil }, 5*time.Second, 10*time.Millisecond) } @@ -98,16 +93,16 @@ func TestManager_DeregisterPlugin(t *testing.T) { pm.Run() require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] - return ok + im := instanceManagerByTypeAndName(pm, fakePlugin.Type, fakePlugin.Name) + return im != nil }, 5*time.Second, 10*time.Millisecond) err = registry.DeregisterPlugin(fakePlugin.Type, fakePlugin.Name) require.Nil(t, err) require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] - return !ok + im := instanceManagerByTypeAndName(pm, fakePlugin.Type, fakePlugin.Name) + return im == nil }, 5*time.Second, 10*time.Millisecond) } @@ -142,20 +137,30 @@ func TestManager_MultiplePlugins(t *testing.T) { pm.Run() require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] - return ok + im := instanceManagerByTypeAndName(pm, fakePlugin.Type, fakePlugin.Name) + return im != nil }, 5*time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { - _, ok := pm.instances[fakeNodePlugin.Type][fakeNodePlugin.Name] - return ok + im := instanceManagerByTypeAndName(pm, fakeNodePlugin.Type, fakeNodePlugin.Name) + return im != nil }, 5*time.Second, 10*time.Millisecond) err = registry.DeregisterPlugin(fakePlugin.Type, fakePlugin.Name) require.Nil(t, err) require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] - return !ok + im := instanceManagerByTypeAndName(pm, fakePlugin.Type, fakePlugin.Name) + return im == nil }, 5*time.Second, 10*time.Millisecond) } + +// instanceManagerByTypeAndName is a test helper to get the instance +// manager for the plugin, protected by the lock that the csiManager +// will normally do internally +func instanceManagerByTypeAndName(mgr *csiManager, pluginType, pluginName string) *instanceManager { + mgr.instancesLock.RLock() + defer mgr.instancesLock.RUnlock() + im, _ := mgr.instances[pluginType][pluginName] + return im +}