Skip to content

Commit

Permalink
csi: update plugin registry to allow multiple allocs per plugin
Browse files Browse the repository at this point in the history
The dynamic plugin registry assumes that plugins are singletons, which
matches the behavior of other Nomad plugins. But because dynamic
plugins like CSI are implemented by allocations, we need to handle the
possibility of multiple allocations for a given plugin type + ID, as
well as behaviors around interleaved allocation starts and stops.

Update the data structure for the dynamic registry so that more recent
allocations take over as the instance manager singleton, but we still
preserve the previous running allocations so that restores work
without racing.
  • Loading branch information
tgross committed Feb 17, 2022
1 parent 0a46a31 commit 1b8eb1e
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 89 deletions.
3 changes: 2 additions & 1 deletion client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,9 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
// closes over its own registration
rname := reg.Name
rtype := reg.Type
allocID := reg.AllocID
deregistrationFns = append(deregistrationFns, func() {
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname)
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname, allocID)
if err != nil {
h.logger.Error("failed to deregister csi plugin", "name", rname, "type", rtype, "error", err)
}
Expand Down
80 changes: 60 additions & 20 deletions client/dynamicplugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dynamicplugins

import (
"container/list"
"context"
"errors"
"fmt"
Expand All @@ -19,7 +20,7 @@ const (
// that are running as Nomad Tasks.
type Registry interface {
RegisterPlugin(info *PluginInfo) error
DeregisterPlugin(ptype, name string) error
DeregisterPlugin(ptype, name, allocID string) error

ListPlugins(ptype string) []*PluginInfo
DispensePlugin(ptype, name string) (interface{}, error)
Expand All @@ -31,20 +32,29 @@ type Registry interface {
StubDispenserForType(ptype string, dispenser PluginDispenser)
}

// RegistryState is what we persist in the client state store. It contains
// a map of plugin types to maps of plugin name -> PluginInfo.
// RegistryState is what we persist in the client state
// store. It contains a map of plugin types to maps of plugin name ->
// list of *PluginInfo, sorted by recency of registration
type RegistryState struct {
Plugins map[string]map[string]*PluginInfo
Plugins map[string]map[string]*list.List
}

// TODO(tgross): LegacyRegistryState is the v1 state persisted in the
// client state store. We need to do a data migration in the boltdb
// store on restart.
//
// type LegacyRegistryState struct {
// Plugins map[string]map[string]*PluginInfo
// }

type PluginDispenser func(info *PluginInfo) (interface{}, error)

// NewRegistry takes a map of `plugintype` to PluginDispenser functions
// that should be used to vend clients for plugins to be used.
func NewRegistry(state StateStorage, dispensers map[string]PluginDispenser) Registry {

registry := &dynamicRegistry{
plugins: make(map[string]map[string]*PluginInfo),
plugins: make(map[string]map[string]*list.List),
broadcasters: make(map[string]*pluginEventBroadcaster),
dispensers: dispensers,
state: state,
Expand Down Expand Up @@ -122,7 +132,7 @@ type PluginUpdateEvent struct {
}

type dynamicRegistry struct {
plugins map[string]map[string]*PluginInfo
plugins map[string]map[string]*list.List
pluginsLock sync.RWMutex

broadcasters map[string]*pluginEventBroadcaster
Expand Down Expand Up @@ -180,18 +190,35 @@ func (d *dynamicRegistry) RegisterPlugin(info *PluginInfo) error {

pmap, ok := d.plugins[info.Type]
if !ok {
pmap = make(map[string]*PluginInfo, 1)
pmap = make(map[string]*list.List)
d.plugins[info.Type] = pmap
}
infos, ok := pmap[info.Name]
if !ok {
infos = list.New()
pmap[info.Name] = infos
}

pmap[info.Name] = info

broadcaster := d.broadcasterForPluginType(info.Type)
event := &PluginUpdateEvent{
EventType: EventTypeRegistered,
Info: info,
// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
// If we're already registered, we should update the definition
// and send a broadcast of any update so the instanceManager can
// be restarted if there's been a change
var alreadyRegistered bool
for e := infos.Front(); e != nil; e = e.Next() {
if e.Value.(*PluginInfo).AllocID == info.AllocID {
alreadyRegistered = true
break
}
}
if !alreadyRegistered {
infos.PushFront(info)
broadcaster := d.broadcasterForPluginType(info.Type)
event := &PluginUpdateEvent{
EventType: EventTypeRegistered,
Info: info,
}
broadcaster.broadcast(event)
}
broadcaster.broadcast(event)

return d.sync()
}
Expand All @@ -209,7 +236,7 @@ func (d *dynamicRegistry) broadcasterForPluginType(ptype string) *pluginEventBro
return broadcaster
}

func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
func (d *dynamicRegistry) DeregisterPlugin(ptype, name, allocID string) error {
d.pluginsLock.Lock()
defer d.pluginsLock.Unlock()

Expand All @@ -223,19 +250,30 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
// developers during the development of new plugin types.
return errors.New("must specify plugin name to deregister")
}
if allocID == "" {
return errors.New("must specify plugin allocation ID to deregister")
}

pmap, ok := d.plugins[ptype]
if !ok {
// If this occurs there's a bug in the registration handler.
return fmt.Errorf("no plugins registered for type: %s", ptype)
}

info, ok := pmap[name]
infos, ok := pmap[name]
if !ok {
// plugin already deregistered, don't send events or try re-deleting.
return nil
}
delete(pmap, name)

var info *PluginInfo
for e := infos.Front(); e != nil; e = e.Next() {
info = e.Value.(*PluginInfo)
if info.AllocID == allocID {
infos.Remove(e)
break
}
}

broadcaster := d.broadcasterForPluginType(ptype)
event := &PluginUpdateEvent{
Expand All @@ -259,7 +297,9 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo {
plugins := make([]*PluginInfo, 0, len(pmap))

for _, info := range pmap {
plugins = append(plugins, info)
if info.Front() != nil {
plugins = append(plugins, info.Front().Value.(*PluginInfo))
}
}

return plugins
Expand Down Expand Up @@ -302,11 +342,11 @@ func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}
}

info, ok := pmap[name]
if !ok {
if !ok || info.Front() == nil {
return nil, fmt.Errorf("plugin %s for type %s not found", name, ptype)
}

return dispenseFunc(info)
return dispenseFunc(info.Front().Value.(*PluginInfo))
}

// PluginsUpdatedCh returns a channel over which plugin events for the requested
Expand Down
26 changes: 10 additions & 16 deletions client/dynamicplugins/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,12 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {
err := r.RegisterPlugin(&PluginInfo{
Type: "csi",
Name: "my-plugin",
AllocID: "alloc-0",
ConnectionInfo: &PluginConnectionInfo{},
})
require.NoError(t, err)

err = r.DeregisterPlugin("csi", "my-plugin")
err = r.DeregisterPlugin("csi", "my-plugin", "alloc-0")
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -179,21 +180,23 @@ func TestDynamicRegistry_IsolatePluginTypes(t *testing.T) {
err := r.RegisterPlugin(&PluginInfo{
Type: PluginTypeCSIController,
Name: "my-plugin",
AllocID: "alloc-0",
ConnectionInfo: &PluginConnectionInfo{},
})
require.NoError(t, err)

err = r.RegisterPlugin(&PluginInfo{
Type: PluginTypeCSINode,
Name: "my-plugin",
AllocID: "alloc-1",
ConnectionInfo: &PluginConnectionInfo{},
})
require.NoError(t, err)

err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin")
err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin", "alloc-0")
require.NoError(t, err)
require.Equal(t, len(r.ListPlugins(PluginTypeCSINode)), 1)
require.Equal(t, len(r.ListPlugins(PluginTypeCSIController)), 0)
require.Equal(t, 1, len(r.ListPlugins(PluginTypeCSINode)))
require.Equal(t, 0, len(r.ListPlugins(PluginTypeCSIController)))
}

func TestDynamicRegistry_StateStore(t *testing.T) {
Expand Down Expand Up @@ -278,8 +281,6 @@ func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) {

require.NoError(t, newR.RegisterPlugin(plugin0))
plugin = dispensePlugin(t, newR)
// TODO: this currently fails because the RestoreTask races
// between the two allocations and the old plugin is overwritten
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
require.Equal(t, "alloc-1", plugin.AllocID)
})
Expand Down Expand Up @@ -308,13 +309,8 @@ func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) {
require.Equal(t, "alloc-1", plugin.AllocID)

// RestoreTask fires for all allocations but none of them are
// running because we restarted the whole host
//
// TODO: csi_hooks fail in this window because we'll send to a
// socket no one is listening on! We won't be able to
// unpublish either!

// server gives us a replacement alloc
// running because we restarted the whole host. Server gives
// us a replacement alloc

require.NoError(t, newR.RegisterPlugin(plugin2))
plugin = dispensePlugin(t, newR)
Expand All @@ -336,9 +332,7 @@ func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) {
plugin := dispensePlugin(t, reg)
require.Equal(t, "alloc-1", plugin.AllocID)

reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin")
// TODO: this currently fails because the Deregister lost the race
// and removes the plugin outright, leaving no running plugin
reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin", "alloc-0")
plugin = dispensePlugin(t, reg)
require.Equal(t, "alloc-1", plugin.AllocID)
})
Expand Down
22 changes: 16 additions & 6 deletions client/pluginmanager/csimanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func New(config *Config) Manager {

type csiManager struct {
// instances should only be accessed from the run() goroutine and the shutdown
// fn. It is a map of PluginType : [PluginName : instanceManager]
// fn. It is a map of PluginType : [PluginName : *instanceManager]
instances map[string]map[string]*instanceManager

registry dynamicplugins.Registry
Expand Down Expand Up @@ -167,11 +167,19 @@ func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) {
name := plugin.Name
ptype := plugin.Type
instances := c.instancesForType(ptype)
if _, ok := instances[name]; !ok {
c.logger.Debug("detected new CSI plugin", "name", name, "type", ptype)
mgr, ok := instances[name]
if !ok {
c.logger.Debug("detected new CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID)
mgr := newInstanceManager(c.logger, c.eventer, c.updateNodeCSIInfoFunc, plugin)
instances[name] = mgr
mgr.run()
} else if mgr.allocID != plugin.AllocID {
mgr.shutdown()
c.logger.Debug("detected update for CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID)
mgr := newInstanceManager(c.logger, c.eventer, c.updateNodeCSIInfoFunc, plugin)
instances[name] = mgr
mgr.run()

}
}

Expand All @@ -182,9 +190,11 @@ func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) {
ptype := plugin.Type
instances := c.instancesForType(ptype)
if mgr, ok := instances[name]; ok {
c.logger.Debug("shutting down CSI plugin", "name", name, "type", ptype)
mgr.shutdown()
delete(instances, name)
if mgr.allocID == plugin.AllocID {
c.logger.Debug("shutting down CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID)
mgr.shutdown()
delete(instances, name)
}
}
}

Expand Down
Loading

0 comments on commit 1b8eb1e

Please sign in to comment.