Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: allow for concurrent plugin allocations #12078

Merged
merged 6 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions client/dynamicplugins/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dynamicplugins

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -221,6 +222,129 @@ func TestDynamicRegistry_StateStore(t *testing.T) {
require.NoError(t, err)
}

func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) {

t.Parallel()
dispenseFn := func(i *PluginInfo) (interface{}, error) {
return i, nil
}

newPlugin := func(idx int) *PluginInfo {
id := fmt.Sprintf("alloc-%d", idx)
return &PluginInfo{
Name: "my-plugin",
Type: PluginTypeCSINode,
Version: fmt.Sprintf("v%d", idx),
ConnectionInfo: &PluginConnectionInfo{
SocketPath: "/var/data/alloc/" + id + "/csi.sock"},
AllocID: id,
}
}

dispensePlugin := func(t *testing.T, reg Registry) *PluginInfo {
result, err := reg.DispensePlugin(PluginTypeCSINode, "my-plugin")
require.NotNil(t, result)
require.NoError(t, err)
Comment on lines +249 to +250
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lgfa29 I think I'm getting a false positive on the semgrep rule here: https://github.com/hashicorp/nomad/runs/5222371451?check_suite_focus=true Arguably these assertions aren't all that useful, so I wouldn't be broken-hearted if I just had to remove them. But any suggestions for a fix?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh interesting, I haven't thought of this pattern before. Feel free to leave the code as-is and ignore the check for now. I will try to adjust the rules to match this.

plugin := result.(*PluginInfo)
return plugin
}

t.Run("restore races on client restart", func(t *testing.T) {
plugin0 := newPlugin(0)
plugin1 := newPlugin(1)

memdb := &MemDB{}
oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})

// add a plugin and a new alloc running the same plugin
// (without stopping the old one)
require.NoError(t, oldR.RegisterPlugin(plugin0))
require.NoError(t, oldR.RegisterPlugin(plugin1))
plugin := dispensePlugin(t, oldR)
require.Equal(t, "alloc-1", plugin.AllocID)

// client restarts and we load state from disk.
// most recently inserted plugin is current

newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})
plugin = dispensePlugin(t, oldR)
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
require.Equal(t, "alloc-1", plugin.AllocID)

// RestoreTask fires for all allocations, which runs the
// plugin_supervisor_hook. But there's a race and the allocations
// in this scenario are Restored in the opposite order they were
// created

require.NoError(t, newR.RegisterPlugin(plugin0))
plugin = dispensePlugin(t, newR)
// TODO: this currently fails because the RestoreTask races
// between the two allocations and the old plugin is overwritten
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
require.Equal(t, "alloc-1", plugin.AllocID)
})

t.Run("replacement races on host restart", func(t *testing.T) {
plugin0 := newPlugin(0)
plugin1 := newPlugin(1)
plugin2 := newPlugin(2)

memdb := &MemDB{}
oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})

// add a plugin and a new alloc running the same plugin
// (without stopping the old one)
require.NoError(t, oldR.RegisterPlugin(plugin0))
require.NoError(t, oldR.RegisterPlugin(plugin1))
plugin := dispensePlugin(t, oldR)
require.Equal(t, "alloc-1", plugin.AllocID)

// client restarts and we load state from disk.
// most recently inserted plugin is current

newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})
plugin = dispensePlugin(t, oldR)
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
require.Equal(t, "alloc-1", plugin.AllocID)

// RestoreTask fires for all allocations but none of them are
// running because we restarted the whole host
//
// TODO: csi_hooks fail in this window because we'll send to a
// socket no one is listening on! We won't be able to
// unpublish either!

// server gives us a replacement alloc

require.NoError(t, newR.RegisterPlugin(plugin2))
plugin = dispensePlugin(t, newR)
require.Equal(t, "/var/data/alloc/alloc-2/csi.sock", plugin.ConnectionInfo.SocketPath)
require.Equal(t, "alloc-2", plugin.AllocID)
})

t.Run("interleaved register and deregister", func(t *testing.T) {
plugin0 := newPlugin(0)
plugin1 := newPlugin(1)

memdb := &MemDB{}
reg := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn})

require.NoError(t, reg.RegisterPlugin(plugin0))

// replacement is registered before old plugin deregisters
require.NoError(t, reg.RegisterPlugin(plugin1))
plugin := dispensePlugin(t, reg)
require.Equal(t, "alloc-1", plugin.AllocID)

reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin")
// TODO: this currently fails because the Deregister lost the race
// and removes the plugin outright, leaving no running plugin
plugin = dispensePlugin(t, reg)
require.Equal(t, "alloc-1", plugin.AllocID)
})

}

// MemDB implements a StateDB that stores data in memory and should only be
// used for testing. All methods are safe for concurrent use. This is a
// partial implementation of the MemDB in the client/state package, copied
Expand Down
Loading