Skip to content

Commit

Permalink
CSI: fix data race in plugin manager
Browse files Browse the repository at this point in the history
The plugin manager for CSI hands out instances of a plugin for callers
that need to mount a volume. The `MounterForPlugin` method accesses
the internal instances map without a lock, and can be called
concurrently from outside the plugin manager's main run-loop.

The original commit for the instances map included a warning that it
needed to be accessed only from the main loop but that comment was
unfortunately ignored shortly thereafter, so this bug has existed in
the code for a couple years without being detected until we ran tests
with `-race` in #12098. Lesson learned here: comments make for lousy
enforcement of invariants!
  • Loading branch information
tgross committed Apr 12, 2022
1 parent 86ca8f7 commit 109666a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 29 deletions.
19 changes: 16 additions & 3 deletions client/pluginmanager/csimanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ func New(config *Config) Manager {
}

type csiManager struct {
// instances should only be accessed from the run() goroutine and the shutdown
// fn. It is a map of PluginType : [PluginName : *instanceManager]
instances map[string]map[string]*instanceManager
// instances should only be accessed after locking with instancesLock.
// It is a map of PluginType : [PluginName : *instanceManager]
instances map[string]map[string]*instanceManager
instancesLock sync.RWMutex

registry dynamicplugins.Registry
logger hclog.Logger
Expand All @@ -75,6 +76,8 @@ func (c *csiManager) PluginManager() pluginmanager.PluginManager {
}

func (c *csiManager) MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error) {
c.instancesLock.RLock()
defer c.instancesLock.RUnlock()
nodePlugins, hasAnyNodePlugins := c.instances["csi-node"]
if !hasAnyNodePlugins {
return nil, fmt.Errorf("no storage node plugins found")
Expand Down Expand Up @@ -118,6 +121,10 @@ func (c *csiManager) runLoop() {
// managers against those in the registry. we primarily will use update
// events from the registry.
func (c *csiManager) resyncPluginsFromRegistry(ptype string) {

c.instancesLock.Lock()
defer c.instancesLock.Unlock()

plugins := c.registry.ListPlugins(ptype)
seen := make(map[string]struct{}, len(plugins))

Expand Down Expand Up @@ -150,6 +157,9 @@ func (c *csiManager) handlePluginEvent(event *dynamicplugins.PluginUpdateEvent)
"plugin_id", event.Info.Name,
"plugin_alloc_id", event.Info.AllocID)

c.instancesLock.Lock()
defer c.instancesLock.Unlock()

switch event.EventType {
case dynamicplugins.EventTypeRegistered:
c.ensureInstance(event.Info)
Expand All @@ -163,6 +173,7 @@ func (c *csiManager) handlePluginEvent(event *dynamicplugins.PluginUpdateEvent)

// Ensure we have an instance manager for the plugin and add it to
// the CSI manager's tracking table for that plugin type.
// Assumes that c.instances has been locked.
func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) {
name := plugin.Name
ptype := plugin.Type
Expand All @@ -185,6 +196,7 @@ func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) {

// Shut down the instance manager for a plugin and remove it from
// the CSI manager's tracking table for that plugin type.
// Assumes that c.instances has been locked.
func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) {
name := plugin.Name
ptype := plugin.Type
Expand All @@ -200,6 +212,7 @@ func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) {

// Get the instance managers table for a specific plugin type,
// ensuring it's been initialized if it doesn't exist.
// Assumes that c.instances has been locked.
func (c *csiManager) instancesForType(ptype string) map[string]*instanceManager {
pluginMap, ok := c.instances[ptype]
if !ok {
Expand Down
61 changes: 35 additions & 26 deletions client/pluginmanager/csimanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,8 @@ func TestManager_RegisterPlugin(t *testing.T) {
pm.Run()

require.Eventually(t, func() bool {
pmap, ok := pm.instances[plugin.Type]
if !ok {
return false
}

_, ok = pmap[plugin.Name]
return ok
im := instanceManagerByTypeAndName(pm, plugin.Type, plugin.Name)
return im != nil
}, 5*time.Second, 10*time.Millisecond)
}

Expand All @@ -85,16 +80,16 @@ func TestManager_DeregisterPlugin(t *testing.T) {
pm.Run()

require.Eventually(t, func() bool {
_, ok := pm.instances[plugin.Type][plugin.Name]
return ok
im := instanceManagerByTypeAndName(pm, plugin.Type, plugin.Name)
return im != nil
}, 5*time.Second, 10*time.Millisecond)

err = registry.DeregisterPlugin(plugin.Type, plugin.Name, "alloc-0")
require.NoError(t, err)

require.Eventually(t, func() bool {
_, ok := pm.instances[plugin.Type][plugin.Name]
return !ok
im := instanceManagerByTypeAndName(pm, plugin.Type, plugin.Name)
return im == nil
}, 5*time.Second, 10*time.Millisecond)
}

Expand All @@ -119,21 +114,21 @@ func TestManager_MultiplePlugins(t *testing.T) {
pm.Run()

require.Eventually(t, func() bool {
_, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name]
return ok
im := instanceManagerByTypeAndName(pm, controllerPlugin.Type, controllerPlugin.Name)
return im != nil
}, 5*time.Second, 10*time.Millisecond)

require.Eventually(t, func() bool {
_, ok := pm.instances[nodePlugin.Type][nodePlugin.Name]
return ok
im := instanceManagerByTypeAndName(pm, nodePlugin.Type, nodePlugin.Name)
return im != nil
}, 5*time.Second, 10*time.Millisecond)

err = registry.DeregisterPlugin(controllerPlugin.Type, controllerPlugin.Name, "alloc-0")
require.NoError(t, err)

require.Eventually(t, func() bool {
_, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name]
return !ok
im := instanceManagerByTypeAndName(pm, controllerPlugin.Type, controllerPlugin.Name)
return im == nil
}, 5*time.Second, 10*time.Millisecond)
}

Expand All @@ -154,8 +149,9 @@ func TestManager_ConcurrentPlugins(t *testing.T) {
require.NoError(t, registry.RegisterPlugin(plugin0))
require.NoError(t, registry.RegisterPlugin(plugin1))
require.Eventuallyf(t, func() bool {
im, _ := pm.instances[plugin0.Type][plugin0.Name]
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name)
return im != nil &&
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im.allocID == "alloc-1"
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin")

Expand All @@ -172,8 +168,9 @@ func TestManager_ConcurrentPlugins(t *testing.T) {
pm.Run()

require.Eventuallyf(t, func() bool {
im, _ := pm.instances[plugin0.Type][plugin0.Name]
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name)
return im != nil &&
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im.allocID == "alloc-1"
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin was not active after state reload")

Expand All @@ -183,8 +180,9 @@ func TestManager_ConcurrentPlugins(t *testing.T) {

require.NoError(t, registry.RegisterPlugin(plugin2))
require.Eventuallyf(t, func() bool {
im, _ := pm.instances[plugin0.Type][plugin0.Name]
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-2/csi.sock" &&
im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name)
return im != nil &&
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-2/csi.sock" &&
im.allocID == "alloc-2"
}, 5*time.Second, 10*time.Millisecond, "alloc-2 plugin was not active after replacement")

Expand All @@ -206,21 +204,32 @@ func TestManager_ConcurrentPlugins(t *testing.T) {
require.NoError(t, registry.RegisterPlugin(plugin1))

require.Eventuallyf(t, func() bool {
im, _ := pm.instances[plugin0.Type][plugin0.Name]
return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name)
return im != nil &&
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" &&
im.allocID == "alloc-1"
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin")

registry.DeregisterPlugin(dynamicplugins.PluginTypeCSINode, "my-plugin", "alloc-0")

require.Eventuallyf(t, func() bool {
im, _ := pm.instances[plugin0.Type][plugin0.Name]
im := instanceManagerByTypeAndName(pm, plugin0.Type, plugin0.Name)
return im != nil &&
im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock"
}, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin should still be active plugin")
})
}

// instanceManagerByTypeAndName is a test helper to get the instance
// manager for the plugin, protected by the lock that the csiManager
// will normally do internally
func instanceManagerByTypeAndName(mgr *csiManager, pluginType, pluginName string) *instanceManager {
mgr.instancesLock.RLock()
defer mgr.instancesLock.RUnlock()
im, _ := mgr.instances[pluginType][pluginName]
return im
}

// MemDB implements a StateDB that stores data in memory and should only be
// used for testing. All methods are safe for concurrent use. This is a
// partial implementation of the MemDB in the client/state package, copied
Expand Down

0 comments on commit 109666a

Please sign in to comment.