Skip to content

Commit

Permalink
csi: failing tests for concurrency bugs in plugin registration
Browse files Browse the repository at this point in the history
The dynamic plugin registry assumes that plugins are singletons, which
matches the behavior of other Nomad plugins. But because dynamic
plugins like CSI are implemented by allocations, we need to handle the
possibility of multiple allocations for a given plugin type + ID, as
well as behaviors around interleaved allocation starts and stops.
  • Loading branch information
tgross committed Feb 16, 2022
1 parent 1fabefd commit 0a46a31
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 58 deletions.
124 changes: 124 additions & 0 deletions client/dynamicplugins/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dynamicplugins

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -221,6 +222,129 @@ func TestDynamicRegistry_StateStore(t *testing.T) {
require.NoError(t, err)
}

func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) {

t.Parallel()
dispenseFn := func(i *PluginInfo) (interface{}, error) {
return i, nil
}

newPlugin := func(idx int) *PluginInfo {
id := fmt.Sprintf("alloc-%d", idx)
return &PluginInfo{
Name: "my-plugin",
Type: PluginTypeCSINode,
Version: fmt.Sprintf("v%d", idx),
ConnectionInfo: &PluginConnectionInfo{
SocketPath: "/var/data/alloc/" + id + "/csi.sock"},
AllocID: id,
}
}

dispensePlugin := func(t *testing.T, reg Registry) *PluginInfo {
result, err := reg.DispensePlugin(PluginTypeCSINode, "my-plugin")
require.NotNil(t, result)
require.NoError(t, err)
plugin := result.(*PluginInfo)
return plugin
}

t.Run("restore races on client restart", func(t *testing.T) {
plugin0 := newPlugin(0)
plugin1 := newPlugin(1)

memdb := &MemDB{}
oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})

// add a plugin and a new alloc running the same plugin
// (without stopping the old one)
require.NoError(t, oldR.RegisterPlugin(plugin0))
require.NoError(t, oldR.RegisterPlugin(plugin1))
plugin := dispensePlugin(t, oldR)
require.Equal(t, "alloc-1", plugin.AllocID)

// client restarts and we load state from disk.
// most recently inserted plugin is current

newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})
plugin = dispensePlugin(t, oldR)
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
require.Equal(t, "alloc-1", plugin.AllocID)

// RestoreTask fires for all allocations, which runs the
// plugin_supervisor_hook. But there's a race and the allocations
// in this scenario are Restored in the opposite order they were
// created

require.NoError(t, newR.RegisterPlugin(plugin0))
plugin = dispensePlugin(t, newR)
// TODO: this currently fails because the RestoreTask races
// between the two allocations and the old plugin is overwritten
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
require.Equal(t, "alloc-1", plugin.AllocID)
})

t.Run("replacement races on host restart", func(t *testing.T) {
plugin0 := newPlugin(0)
plugin1 := newPlugin(1)
plugin2 := newPlugin(2)

memdb := &MemDB{}
oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})

// add a plugin and a new alloc running the same plugin
// (without stopping the old one)
require.NoError(t, oldR.RegisterPlugin(plugin0))
require.NoError(t, oldR.RegisterPlugin(plugin1))
plugin := dispensePlugin(t, oldR)
require.Equal(t, "alloc-1", plugin.AllocID)

// client restarts and we load state from disk.
// most recently inserted plugin is current

newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})
plugin = dispensePlugin(t, oldR)
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
require.Equal(t, "alloc-1", plugin.AllocID)

// RestoreTask fires for all allocations but none of them are
// running because we restarted the whole host
//
// TODO: csi_hooks fail in this window because we'll send to a
// socket no one is listening on! We won't be able to
// unpublish either!

// server gives us a replacement alloc

require.NoError(t, newR.RegisterPlugin(plugin2))
plugin = dispensePlugin(t, newR)
require.Equal(t, "/var/data/alloc/alloc-2/csi.sock", plugin.ConnectionInfo.SocketPath)
require.Equal(t, "alloc-2", plugin.AllocID)
})

t.Run("interleaved register and deregister", func(t *testing.T) {
plugin0 := newPlugin(0)
plugin1 := newPlugin(1)

memdb := &MemDB{}
reg := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})

require.NoError(t, reg.RegisterPlugin(plugin0))

// replacement is registered before old plugin deregisters
require.NoError(t, reg.RegisterPlugin(plugin1))
plugin := dispensePlugin(t, reg)
require.Equal(t, "alloc-1", plugin.AllocID)

reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin")
// TODO: this currently fails because the Deregister lost the race
// and removes the plugin outright, leaving no running plugin
plugin = dispensePlugin(t, reg)
require.Equal(t, "alloc-1", plugin.AllocID)
})

}

// MemDB implements a StateDB that stores data in memory and should only be
// used for testing. All methods are safe for concurrent use. This is a
// partial implementation of the MemDB in the client/state package, copied
Expand Down
Loading

0 comments on commit 0a46a31

Please sign in to comment.