diff --git a/.changelog/12553.txt b/.changelog/12553.txt new file mode 100644 index 000000000000..a0da82808510 --- /dev/null +++ b/.changelog/12553.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed bug where accessing plugins was subject to a data race +``` diff --git a/client/pluginmanager/csimanager/manager.go b/client/pluginmanager/csimanager/manager.go index d2b5bf78d50e..a5b2f51cd446 100644 --- a/client/pluginmanager/csimanager/manager.go +++ b/client/pluginmanager/csimanager/manager.go @@ -54,9 +54,10 @@ func New(config *Config) Manager { } type csiManager struct { - // instances should only be accessed from the run() goroutine and the shutdown - // fn. It is a map of PluginType : [PluginName : *instanceManager] - instances map[string]map[string]*instanceManager + // instances should only be accessed after locking with instancesLock. + // It is a map of PluginType : [PluginName : *instanceManager] + instances map[string]map[string]*instanceManager + instancesLock sync.RWMutex registry dynamicplugins.Registry logger hclog.Logger @@ -75,6 +76,8 @@ func (c *csiManager) PluginManager() pluginmanager.PluginManager { } func (c *csiManager) MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error) { + c.instancesLock.RLock() + defer c.instancesLock.RUnlock() nodePlugins, hasAnyNodePlugins := c.instances["csi-node"] if !hasAnyNodePlugins { return nil, fmt.Errorf("no storage node plugins found") @@ -118,6 +121,10 @@ func (c *csiManager) runLoop() { // managers against those in the registry. we primarily will use update // events from the registry. func (c *csiManager) resyncPluginsFromRegistry(ptype string) { + + c.instancesLock.Lock() + defer c.instancesLock.Unlock() + plugins := c.registry.ListPlugins(ptype) seen := make(map[string]struct{}, len(plugins)) @@ -150,6 +157,9 @@ func (c *csiManager) handlePluginEvent(event *dynamicplugins.PluginUpdateEvent) "plugin_id", event.Info.Name, "plugin_alloc_id", event.Info.AllocID) + c.instancesLock.Lock() + defer c.instancesLock.Unlock() + switch event.EventType { case dynamicplugins.EventTypeRegistered: c.ensureInstance(event.Info) @@ -163,6 +173,7 @@ func (c *csiManager) handlePluginEvent(event *dynamicplugins.PluginUpdateEvent) // Ensure we have an instance manager for the plugin and add it to // the CSI manager's tracking table for that plugin type. +// Assumes that c.instances has been locked. func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) { name := plugin.Name ptype := plugin.Type @@ -185,6 +196,7 @@ func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) { // Shut down the instance manager for a plugin and remove it from // the CSI manager's tracking table for that plugin type. +// Assumes that c.instances has been locked. func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) { name := plugin.Name ptype := plugin.Type @@ -200,6 +212,7 @@ func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) { // Get the instance managers table for a specific plugin type, // ensuring it's been initialized if it doesn't exist. +// Assumes that c.instances has been locked. func (c *csiManager) instancesForType(ptype string) map[string]*instanceManager { pluginMap, ok := c.instances[ptype] if !ok { diff --git a/client/pluginmanager/csimanager/manager_test.go b/client/pluginmanager/csimanager/manager_test.go index 6ece1df469d7..ee469d9d171d 100644 --- a/client/pluginmanager/csimanager/manager_test.go +++ b/client/pluginmanager/csimanager/manager_test.go @@ -62,13 +62,8 @@ func TestManager_RegisterPlugin(t *testing.T) { pm.Run() require.Eventually(t, func() bool { - pmap, ok := pm.instances[plugin.Type] - if !ok { - return false - } - - _, ok = pmap[plugin.Name] - return ok + im := instanceManagerByTypeAndName(pm, plugin.Type, plugin.Name) + return im != nil }, 5*time.Second, 10*time.Millisecond) } @@ -85,16 +80,16 @@ func TestManager_DeregisterPlugin(t *testing.T) { pm.Run() require.Eventually(t, func() bool { - _, ok := pm.instances[plugin.Type][plugin.Name] - return ok + im := instanceManagerByTypeAndName(pm, plugin.Type, plugin.Name) + return im != nil }, 5*time.Second, 10*time.Millisecond) err = registry.DeregisterPlugin(plugin.Type, plugin.Name, "alloc-0") require.NoError(t, err) require.Eventually(t, func() bool { - _, ok := pm.instances[plugin.Type][plugin.Name] - return !ok + im := instanceManagerByTypeAndName(pm, plugin.Type, plugin.Name) + return im == nil }, 5*time.Second, 10*time.Millisecond) } @@ -119,21 +114,21 @@ func TestManager_MultiplePlugins(t *testing.T) { pm.Run() require.Eventually(t, func() bool { - _, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name] - return ok + im := instanceManagerByTypeAndName(pm, controllerPlugin.Type, controllerPlugin.Name) + return im != nil }, 5*time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { - _, ok := pm.instances[nodePlugin.Type][nodePlugin.Name] - return ok + im := instanceManagerByTypeAndName(pm, nodePlugin.Type, nodePlugin.Name) + return im != nil }, 5*time.Second, 10*time.Millisecond) err = registry.DeregisterPlugin(controllerPlugin.Type, controllerPlugin.Name, "alloc-0") require.NoError(t, err) require.Eventually(t, func() bool { - _, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name] - return !ok + im := instanceManagerByTypeAndName(pm, controllerPlugin.Type, controllerPlugin.Name) + return im == nil }, 5*time.Second, 10*time.Millisecond) } @@ -154,8 +149,9 @@ func TestManager_ConcurrentPlugins(t *testing.T) { require.NoError(t, registry.RegisterPlugin(plugin0)) require.NoError(t, registry.RegisterPlugin(plugin1)) require.Eventuallyf(t, func() bool { - im, _ := pm.instances[plugin0.Type][plugin0.Name] - return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name) + return im != nil && + im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && im.allocID == "alloc-1" }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin") @@ -172,8 +168,9 @@ func TestManager_ConcurrentPlugins(t *testing.T) { pm.Run() require.Eventuallyf(t, func() bool { - im, _ := pm.instances[plugin0.Type][plugin0.Name] - return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name) + return im != nil && + im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && im.allocID == "alloc-1" }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin was not active after state reload") @@ -183,8 +180,9 @@ func TestManager_ConcurrentPlugins(t *testing.T) { require.NoError(t, registry.RegisterPlugin(plugin2)) require.Eventuallyf(t, func() bool { - im, _ := pm.instances[plugin0.Type][plugin0.Name] - return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-2/csi.sock" && + im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name) + return im != nil && + im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-2/csi.sock" && im.allocID == "alloc-2" }, 5*time.Second, 10*time.Millisecond, "alloc-2 plugin was not active after replacement") @@ -206,21 +204,32 @@ func TestManager_ConcurrentPlugins(t *testing.T) { require.NoError(t, registry.RegisterPlugin(plugin1)) require.Eventuallyf(t, func() bool { - im, _ := pm.instances[plugin0.Type][plugin0.Name] - return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name) + return im != nil && + im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && im.allocID == "alloc-1" }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin") registry.DeregisterPlugin(dynamicplugins.PluginTypeCSINode, "my-plugin", "alloc-0") require.Eventuallyf(t, func() bool { - im, _ := pm.instances[plugin0.Type][plugin0.Name] + im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name) return im != nil && im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin should still be active plugin") }) } +// instanceManagerByTypeAndName is a test helper to get the instance +// manager for the plugin, protected by the lock that the csiManager +// will normally do internally +func instanceManagerByTypeAndName(mgr *csiManager, pluginType, pluginName string) *instanceManager { + mgr.instancesLock.RLock() + defer mgr.instancesLock.RUnlock() + im, _ := mgr.instances[pluginType][pluginName] + return im +} + // MemDB implements a StateDB that stores data in memory and should only be // used for testing. All methods are safe for concurrent use. This is a // partial implementation of the MemDB in the client/state package, copied