diff --git a/client/allocrunner/taskrunner/plugin_supervisor_hook.go b/client/allocrunner/taskrunner/plugin_supervisor_hook.go index 679fb2f73967..eaea7682d42e 100644 --- a/client/allocrunner/taskrunner/plugin_supervisor_hook.go +++ b/client/allocrunner/taskrunner/plugin_supervisor_hook.go @@ -348,8 +348,9 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat // closes over its own registration rname := reg.Name rtype := reg.Type + allocID := reg.AllocID deregistrationFns = append(deregistrationFns, func() { - err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname) + err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname, allocID) if err != nil { h.logger.Error("failed to deregister csi plugin", "name", rname, "type", rtype, "error", err) } diff --git a/client/dynamicplugins/registry.go b/client/dynamicplugins/registry.go index ed1710ee568d..07c3852386d2 100644 --- a/client/dynamicplugins/registry.go +++ b/client/dynamicplugins/registry.go @@ -4,6 +4,7 @@ package dynamicplugins import ( + "container/list" "context" "errors" "fmt" @@ -19,7 +20,7 @@ const ( // that are running as Nomad Tasks. type Registry interface { RegisterPlugin(info *PluginInfo) error - DeregisterPlugin(ptype, name string) error + DeregisterPlugin(ptype, name, allocID string) error ListPlugins(ptype string) []*PluginInfo DispensePlugin(ptype, name string) (interface{}, error) @@ -31,12 +32,20 @@ type Registry interface { StubDispenserForType(ptype string, dispenser PluginDispenser) } -// RegistryState is what we persist in the client state store. It contains -// a map of plugin types to maps of plugin name -> PluginInfo. +// RegistryState is what we persist in the client state +// store. It contains a map of plugin types to maps of plugin name -> +// list of *PluginInfo, sorted by recency of registration type RegistryState struct { - Plugins map[string]map[string]*PluginInfo + Plugins map[string]map[string]*list.List } +// TODO: LegacyRegistryState is the v1 state persisted in the client +// state store. We need to do a data migration in the boltdb store on +// +// type LegacyRegistryState struct { +// Plugins map[string]map[string]*PluginInfo +// } + type PluginDispenser func(info *PluginInfo) (interface{}, error) // NewRegistry takes a map of `plugintype` to PluginDispenser functions @@ -44,7 +53,7 @@ type PluginDispenser func(info *PluginInfo) (interface{}, error) func NewRegistry(state StateStorage, dispensers map[string]PluginDispenser) Registry { registry := &dynamicRegistry{ - plugins: make(map[string]map[string]*PluginInfo), + plugins: make(map[string]map[string]*list.List), broadcasters: make(map[string]*pluginEventBroadcaster), dispensers: dispensers, state: state, @@ -122,7 +131,7 @@ type PluginUpdateEvent struct { } type dynamicRegistry struct { - plugins map[string]map[string]*PluginInfo + plugins map[string]map[string]*list.List pluginsLock sync.RWMutex broadcasters map[string]*pluginEventBroadcaster @@ -180,11 +189,28 @@ func (d *dynamicRegistry) RegisterPlugin(info *PluginInfo) error { pmap, ok := d.plugins[info.Type] if !ok { - pmap = make(map[string]*PluginInfo, 1) + pmap = make(map[string]*list.List) d.plugins[info.Type] = pmap } + infos, ok := pmap[info.Name] + if !ok { + infos = list.New() + pmap[info.Name] = infos + } - pmap[info.Name] = info + // TODO: if it's already registered, we should update-in-place + // with the new values? + var alreadyRegistered bool + for e := infos.Front(); e != nil; e = e.Next() { + if e.Value.(*PluginInfo).AllocID == info.AllocID { + alreadyRegistered = true + break + } + } + if alreadyRegistered { + return d.sync() + } + infos.PushFront(info) broadcaster := d.broadcasterForPluginType(info.Type) event := &PluginUpdateEvent{ @@ -209,7 +235,7 @@ func (d *dynamicRegistry) broadcasterForPluginType(ptype string) *pluginEventBro return broadcaster } -func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error { +func (d *dynamicRegistry) DeregisterPlugin(ptype, name, allocID string) error { d.pluginsLock.Lock() defer d.pluginsLock.Unlock() @@ -223,6 +249,11 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error { // developers during the development of new plugin types. return errors.New("must specify plugin name to deregister") } + if allocID == "" { + // This error shouldn't make it to a production cluster and is to aid + // developers during the development of new plugin types. + return errors.New("must specify plugin allocation ID to deregister") + } pmap, ok := d.plugins[ptype] if !ok { @@ -230,12 +261,20 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error { return fmt.Errorf("no plugins registered for type: %s", ptype) } - info, ok := pmap[name] + infos, ok := pmap[name] if !ok { // plugin already deregistered, don't send events or try re-deleting. return nil } - delete(pmap, name) + + var info *PluginInfo + for e := infos.Front(); e != nil; e = e.Next() { + info = e.Value.(*PluginInfo) + if info.AllocID == allocID { + infos.Remove(e) + break + } + } broadcaster := d.broadcasterForPluginType(ptype) event := &PluginUpdateEvent{ @@ -259,7 +298,9 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo { plugins := make([]*PluginInfo, 0, len(pmap)) for _, info := range pmap { - plugins = append(plugins, info) + if info.Front() != nil { + plugins = append(plugins, info.Front().Value.(*PluginInfo)) + } } return plugins @@ -306,7 +347,7 @@ func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{} return nil, fmt.Errorf("plugin %s for type %s not found", name, ptype) } - return dispenseFunc(info) + return dispenseFunc(info.Front().Value.(*PluginInfo)) } // PluginsUpdatedCh returns a channel over which plugin events for the requested diff --git a/client/dynamicplugins/registry_test.go b/client/dynamicplugins/registry_test.go index 28e626e8cc64..a820a675f8d4 100644 --- a/client/dynamicplugins/registry_test.go +++ b/client/dynamicplugins/registry_test.go @@ -133,11 +133,12 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) { err := r.RegisterPlugin(&PluginInfo{ Type: "csi", Name: "my-plugin", + AllocID: "alloc-0", ConnectionInfo: &PluginConnectionInfo{}, }) require.NoError(t, err) - err = r.DeregisterPlugin("csi", "my-plugin") + err = r.DeregisterPlugin("csi", "my-plugin", "alloc-0") require.NoError(t, err) require.Eventually(t, func() bool { @@ -179,6 +180,7 @@ func TestDynamicRegistry_IsolatePluginTypes(t *testing.T) { err := r.RegisterPlugin(&PluginInfo{ Type: PluginTypeCSIController, Name: "my-plugin", + AllocID: "alloc-0", ConnectionInfo: &PluginConnectionInfo{}, }) require.NoError(t, err) @@ -186,14 +188,15 @@ func TestDynamicRegistry_IsolatePluginTypes(t *testing.T) { err = r.RegisterPlugin(&PluginInfo{ Type: PluginTypeCSINode, Name: "my-plugin", + AllocID: "alloc-1", ConnectionInfo: &PluginConnectionInfo{}, }) require.NoError(t, err) - err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin") + err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin", "alloc-0") require.NoError(t, err) - require.Equal(t, len(r.ListPlugins(PluginTypeCSINode)), 1) - require.Equal(t, len(r.ListPlugins(PluginTypeCSIController)), 0) + require.Equal(t, 1, len(r.ListPlugins(PluginTypeCSINode))) + require.Equal(t, 0, len(r.ListPlugins(PluginTypeCSIController))) } func TestDynamicRegistry_StateStore(t *testing.T) { @@ -278,8 +281,6 @@ func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) { require.NoError(t, newR.RegisterPlugin(plugin0)) plugin = dispensePlugin(t, newR) - // TODO: this currently fails because the RestoreTask races - // between the two allocations and the old plugin is overwritten require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath) require.Equal(t, "alloc-1", plugin.AllocID) }) @@ -308,13 +309,8 @@ func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) { require.Equal(t, "alloc-1", plugin.AllocID) // RestoreTask fires for all allocations but none of them are - // running because we restarted the whole host - // - // TODO: csi_hooks fail in this window because we'll send to a - // socket no one is listening on! We won't be able to - // unpublish either! - - // server gives us a replacement alloc + // running because we restarted the whole host. Server gives + // us a replacement alloc require.NoError(t, newR.RegisterPlugin(plugin2)) plugin = dispensePlugin(t, newR) @@ -336,9 +332,7 @@ func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) { plugin := dispensePlugin(t, reg) require.Equal(t, "alloc-1", plugin.AllocID) - reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin") - // TODO: this currently fails because the Deregister lost the race - // and removes the plugin outright, leaving no running plugin + reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin", "alloc-0") plugin = dispensePlugin(t, reg) require.Equal(t, "alloc-1", plugin.AllocID) }) diff --git a/client/pluginmanager/csimanager/manager.go b/client/pluginmanager/csimanager/manager.go index 11dc66b4c19e..d2b5bf78d50e 100644 --- a/client/pluginmanager/csimanager/manager.go +++ b/client/pluginmanager/csimanager/manager.go @@ -55,7 +55,7 @@ func New(config *Config) Manager { type csiManager struct { // instances should only be accessed from the run() goroutine and the shutdown - // fn. It is a map of PluginType : [PluginName : instanceManager] + // fn. It is a map of PluginType : [PluginName : *instanceManager] instances map[string]map[string]*instanceManager registry dynamicplugins.Registry @@ -167,11 +167,19 @@ func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) { name := plugin.Name ptype := plugin.Type instances := c.instancesForType(ptype) - if _, ok := instances[name]; !ok { - c.logger.Debug("detected new CSI plugin", "name", name, "type", ptype) + mgr, ok := instances[name] + if !ok { + c.logger.Debug("detected new CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID) + mgr := newInstanceManager(c.logger, c.eventer, c.updateNodeCSIInfoFunc, plugin) + instances[name] = mgr + mgr.run() + } else if mgr.allocID != plugin.AllocID { + mgr.shutdown() + c.logger.Debug("detected update for CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID) mgr := newInstanceManager(c.logger, c.eventer, c.updateNodeCSIInfoFunc, plugin) instances[name] = mgr mgr.run() + } } @@ -182,9 +190,11 @@ func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) { ptype := plugin.Type instances := c.instancesForType(ptype) if mgr, ok := instances[name]; ok { - c.logger.Debug("shutting down CSI plugin", "name", name, "type", ptype) - mgr.shutdown() - delete(instances, name) + if mgr.allocID == plugin.AllocID { + c.logger.Debug("shutting down CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID) + mgr.shutdown() + delete(instances, name) + } } } diff --git a/client/pluginmanager/csimanager/manager_test.go b/client/pluginmanager/csimanager/manager_test.go index 47b8c2a187e0..6ece1df469d7 100644 --- a/client/pluginmanager/csimanager/manager_test.go +++ b/client/pluginmanager/csimanager/manager_test.go @@ -27,6 +27,15 @@ func fakePlugin(idx int, pluginType string) *dynamicplugins.PluginInfo { } } +func testManager(t *testing.T, registry dynamicplugins.Registry, resyncPeriod time.Duration) *csiManager { + return New(&Config{ + Logger: testlog.HCLogger(t), + DynamicRegistry: registry, + UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, + PluginResyncPeriod: resyncPeriod, + }).(*csiManager) +} + func setupRegistry(reg *MemDB) dynamicplugins.Registry { return dynamicplugins.NewRegistry( reg, @@ -43,13 +52,7 @@ func setupRegistry(reg *MemDB) dynamicplugins.Registry { func TestManager_RegisterPlugin(t *testing.T) { registry := setupRegistry(nil) defer registry.Shutdown() - - cfg := &Config{ - Logger: testlog.HCLogger(t), - DynamicRegistry: registry, - UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, - } - pm := New(cfg).(*csiManager) + pm := testManager(t, registry, time.Hour) defer pm.Shutdown() plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) @@ -72,14 +75,7 @@ func TestManager_RegisterPlugin(t *testing.T) { func TestManager_DeregisterPlugin(t *testing.T) { registry := setupRegistry(nil) defer registry.Shutdown() - - cfg := &Config{ - Logger: testlog.HCLogger(t), - DynamicRegistry: registry, - UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, - PluginResyncPeriod: 500 * time.Millisecond, - } - pm := New(cfg).(*csiManager) + pm := testManager(t, registry, 500*time.Millisecond) defer pm.Shutdown() plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) @@ -93,7 +89,7 @@ func TestManager_DeregisterPlugin(t *testing.T) { return ok }, 5*time.Second, 10*time.Millisecond) - err = registry.DeregisterPlugin(plugin.Type, plugin.Name) + err = registry.DeregisterPlugin(plugin.Type, plugin.Name, "alloc-0") require.NoError(t, err) require.Eventually(t, func() bool { @@ -109,13 +105,7 @@ func TestManager_MultiplePlugins(t *testing.T) { registry := setupRegistry(nil) defer registry.Shutdown() - cfg := &Config{ - Logger: testlog.HCLogger(t), - DynamicRegistry: registry, - UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, - PluginResyncPeriod: 500 * time.Millisecond, - } - pm := New(cfg).(*csiManager) + pm := testManager(t, registry, 500*time.Millisecond) defer pm.Shutdown() controllerPlugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) @@ -138,7 +128,7 @@ func TestManager_MultiplePlugins(t *testing.T) { return ok }, 5*time.Second, 10*time.Millisecond) - err = registry.DeregisterPlugin(controllerPlugin.Type, controllerPlugin.Name) + err = registry.DeregisterPlugin(controllerPlugin.Type, controllerPlugin.Name, "alloc-0") require.NoError(t, err) require.Eventually(t, func() bool { @@ -151,15 +141,6 @@ func TestManager_MultiplePlugins(t *testing.T) { // allocations for the same plugin interact func TestManager_ConcurrentPlugins(t *testing.T) { - testManager := func(registry dynamicplugins.Registry) *csiManager { - return New(&Config{ - Logger: testlog.HCLogger(t), - DynamicRegistry: registry, - UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, - PluginResyncPeriod: time.Hour, // don't resync except via events - }).(*csiManager) - } - t.Run("replacement races on host restart", func(t *testing.T) { plugin0 := fakePlugin(0, dynamicplugins.PluginTypeCSINode) plugin1 := fakePlugin(1, dynamicplugins.PluginTypeCSINode) @@ -167,7 +148,7 @@ func TestManager_ConcurrentPlugins(t *testing.T) { db := &MemDB{} registry := setupRegistry(db) - pm := testManager(registry) + pm := testManager(t, registry, time.Hour) // no resync except from events pm.Run() require.NoError(t, registry.RegisterPlugin(plugin0)) @@ -186,7 +167,7 @@ func TestManager_ConcurrentPlugins(t *testing.T) { registry = setupRegistry(db) defer registry.Shutdown() - pm = testManager(registry) + pm = testManager(t, registry, time.Hour) defer pm.Shutdown() pm.Run() @@ -197,13 +178,8 @@ func TestManager_ConcurrentPlugins(t *testing.T) { }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin was not active after state reload") // RestoreTask fires for all allocations but none of them are - // running because we restarted the whole host - // - // TODO: csi_hooks fail in this window because we'll send to a - // socket no one is listening on! We won't be able to - // unpublish either! - - // server gives us a replacement alloc + // running because we restarted the whole host. Server gives + // us a replacement alloc require.NoError(t, registry.RegisterPlugin(plugin2)) require.Eventuallyf(t, func() bool { @@ -222,7 +198,7 @@ func TestManager_ConcurrentPlugins(t *testing.T) { registry := setupRegistry(db) defer registry.Shutdown() - pm := testManager(registry) + pm := testManager(t, registry, time.Hour) // no resync except from events defer pm.Shutdown() pm.Run() @@ -235,10 +211,8 @@ func TestManager_ConcurrentPlugins(t *testing.T) { im.allocID == "alloc-1" }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin") - registry.DeregisterPlugin(dynamicplugins.PluginTypeCSINode, "my-plugin") + registry.DeregisterPlugin(dynamicplugins.PluginTypeCSINode, "my-plugin", "alloc-0") - // TODO: this currently fails because the Deregister lost the race - // and removes the plugin outright, leaving no running plugin require.Eventuallyf(t, func() bool { im, _ := pm.instances[plugin0.Type][plugin0.Name] return im != nil &&