Skip to content

Commit

Permalink
[wip] update registry to use linked list of plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Feb 17, 2022
1 parent 0a46a31 commit 5c73101
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 82 deletions.
3 changes: 2 additions & 1 deletion client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,9 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
// closes over its own registration
rname := reg.Name
rtype := reg.Type
allocID := reg.AllocID
deregistrationFns = append(deregistrationFns, func() {
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname)
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname, allocID)
if err != nil {
h.logger.Error("failed to deregister csi plugin", "name", rname, "type", rtype, "error", err)
}
Expand Down
67 changes: 54 additions & 13 deletions client/dynamicplugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dynamicplugins

import (
"container/list"
"context"
"errors"
"fmt"
Expand All @@ -19,7 +20,7 @@ const (
// that are running as Nomad Tasks.
type Registry interface {
RegisterPlugin(info *PluginInfo) error
DeregisterPlugin(ptype, name string) error
DeregisterPlugin(ptype, name, allocID string) error

ListPlugins(ptype string) []*PluginInfo
DispensePlugin(ptype, name string) (interface{}, error)
Expand All @@ -31,20 +32,28 @@ type Registry interface {
StubDispenserForType(ptype string, dispenser PluginDispenser)
}

// RegistryState is what we persist in the client state store. It contains
// a map of plugin types to maps of plugin name -> PluginInfo.
// RegistryState is what we persist in the client state
// store. It contains a map of plugin types to maps of plugin name ->
// list of *PluginInfo, sorted by recency of registration
type RegistryState struct {
Plugins map[string]map[string]*PluginInfo
Plugins map[string]map[string]*list.List
}

// TODO: LegacyRegistryState is the v1 state persisted in the client
// state store. We need to do a data migration in the boltdb store on
//
// type LegacyRegistryState struct {
// Plugins map[string]map[string]*PluginInfo
// }

type PluginDispenser func(info *PluginInfo) (interface{}, error)

// NewRegistry takes a map of `plugintype` to PluginDispenser functions
// that should be used to vend clients for plugins to be used.
func NewRegistry(state StateStorage, dispensers map[string]PluginDispenser) Registry {

registry := &dynamicRegistry{
plugins: make(map[string]map[string]*PluginInfo),
plugins: make(map[string]map[string]*list.List),
broadcasters: make(map[string]*pluginEventBroadcaster),
dispensers: dispensers,
state: state,
Expand Down Expand Up @@ -122,7 +131,7 @@ type PluginUpdateEvent struct {
}

type dynamicRegistry struct {
plugins map[string]map[string]*PluginInfo
plugins map[string]map[string]*list.List
pluginsLock sync.RWMutex

broadcasters map[string]*pluginEventBroadcaster
Expand Down Expand Up @@ -180,11 +189,28 @@ func (d *dynamicRegistry) RegisterPlugin(info *PluginInfo) error {

pmap, ok := d.plugins[info.Type]
if !ok {
pmap = make(map[string]*PluginInfo, 1)
pmap = make(map[string]*list.List)
d.plugins[info.Type] = pmap
}
infos, ok := pmap[info.Name]
if !ok {
infos = list.New()
pmap[info.Name] = infos
}

pmap[info.Name] = info
// TODO: if it's already registered, we should update-in-place
// with the new values?
var alreadyRegistered bool
for e := infos.Front(); e != nil; e = e.Next() {
if e.Value.(*PluginInfo).AllocID == info.AllocID {
alreadyRegistered = true
break
}
}
if alreadyRegistered {
return d.sync()
}
infos.PushFront(info)

broadcaster := d.broadcasterForPluginType(info.Type)
event := &PluginUpdateEvent{
Expand All @@ -209,7 +235,7 @@ func (d *dynamicRegistry) broadcasterForPluginType(ptype string) *pluginEventBro
return broadcaster
}

func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
func (d *dynamicRegistry) DeregisterPlugin(ptype, name, allocID string) error {
d.pluginsLock.Lock()
defer d.pluginsLock.Unlock()

Expand All @@ -223,19 +249,32 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
// developers during the development of new plugin types.
return errors.New("must specify plugin name to deregister")
}
if allocID == "" {
// This error shouldn't make it to a production cluster and is to aid
// developers during the development of new plugin types.
return errors.New("must specify plugin allocation ID to deregister")
}

pmap, ok := d.plugins[ptype]
if !ok {
// If this occurs there's a bug in the registration handler.
return fmt.Errorf("no plugins registered for type: %s", ptype)
}

info, ok := pmap[name]
infos, ok := pmap[name]
if !ok {
// plugin already deregistered, don't send events or try re-deleting.
return nil
}
delete(pmap, name)

var info *PluginInfo
for e := infos.Front(); e != nil; e = e.Next() {
info = e.Value.(*PluginInfo)
if info.AllocID == allocID {
infos.Remove(e)
break
}
}

broadcaster := d.broadcasterForPluginType(ptype)
event := &PluginUpdateEvent{
Expand All @@ -259,7 +298,9 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo {
plugins := make([]*PluginInfo, 0, len(pmap))

for _, info := range pmap {
plugins = append(plugins, info)
if info.Front() != nil {
plugins = append(plugins, info.Front().Value.(*PluginInfo))
}
}

return plugins
Expand Down Expand Up @@ -306,7 +347,7 @@ func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}
return nil, fmt.Errorf("plugin %s for type %s not found", name, ptype)
}

return dispenseFunc(info)
return dispenseFunc(info.Front().Value.(*PluginInfo))
}

// PluginsUpdatedCh returns a channel over which plugin events for the requested
Expand Down
26 changes: 10 additions & 16 deletions client/dynamicplugins/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,12 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {
err := r.RegisterPlugin(&PluginInfo{
Type: "csi",
Name: "my-plugin",
AllocID: "alloc-0",
ConnectionInfo: &PluginConnectionInfo{},
})
require.NoError(t, err)

err = r.DeregisterPlugin("csi", "my-plugin")
err = r.DeregisterPlugin("csi", "my-plugin", "alloc-0")
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -179,21 +180,23 @@ func TestDynamicRegistry_IsolatePluginTypes(t *testing.T) {
err := r.RegisterPlugin(&PluginInfo{
Type: PluginTypeCSIController,
Name: "my-plugin",
AllocID: "alloc-0",
ConnectionInfo: &PluginConnectionInfo{},
})
require.NoError(t, err)

err = r.RegisterPlugin(&PluginInfo{
Type: PluginTypeCSINode,
Name: "my-plugin",
AllocID: "alloc-1",
ConnectionInfo: &PluginConnectionInfo{},
})
require.NoError(t, err)

err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin")
err = r.DeregisterPlugin(PluginTypeCSIController, "my-plugin", "alloc-0")
require.NoError(t, err)
require.Equal(t, len(r.ListPlugins(PluginTypeCSINode)), 1)
require.Equal(t, len(r.ListPlugins(PluginTypeCSIController)), 0)
require.Equal(t, 1, len(r.ListPlugins(PluginTypeCSINode)))
require.Equal(t, 0, len(r.ListPlugins(PluginTypeCSIController)))
}

func TestDynamicRegistry_StateStore(t *testing.T) {
Expand Down Expand Up @@ -278,8 +281,6 @@ func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) {

require.NoError(t, newR.RegisterPlugin(plugin0))
plugin = dispensePlugin(t, newR)
// TODO: this currently fails because the RestoreTask races
// between the two allocations and the old plugin is overwritten
require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath)
require.Equal(t, "alloc-1", plugin.AllocID)
})
Expand Down Expand Up @@ -308,13 +309,8 @@ func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) {
require.Equal(t, "alloc-1", plugin.AllocID)

// RestoreTask fires for all allocations but none of them are
// running because we restarted the whole host
//
// TODO: csi_hooks fail in this window because we'll send to a
// socket no one is listening on! We won't be able to
// unpublish either!

// server gives us a replacement alloc
// running because we restarted the whole host. Server gives
// us a replacement alloc

require.NoError(t, newR.RegisterPlugin(plugin2))
plugin = dispensePlugin(t, newR)
Expand All @@ -336,9 +332,7 @@ func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) {
plugin := dispensePlugin(t, reg)
require.Equal(t, "alloc-1", plugin.AllocID)

reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin")
// TODO: this currently fails because the Deregister lost the race
// and removes the plugin outright, leaving no running plugin
reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin", "alloc-0")
plugin = dispensePlugin(t, reg)
require.Equal(t, "alloc-1", plugin.AllocID)
})
Expand Down
22 changes: 16 additions & 6 deletions client/pluginmanager/csimanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func New(config *Config) Manager {

type csiManager struct {
// instances should only be accessed from the run() goroutine and the shutdown
// fn. It is a map of PluginType : [PluginName : instanceManager]
// fn. It is a map of PluginType : [PluginName : *instanceManager]
instances map[string]map[string]*instanceManager

registry dynamicplugins.Registry
Expand Down Expand Up @@ -167,11 +167,19 @@ func (c *csiManager) ensureInstance(plugin *dynamicplugins.PluginInfo) {
name := plugin.Name
ptype := plugin.Type
instances := c.instancesForType(ptype)
if _, ok := instances[name]; !ok {
c.logger.Debug("detected new CSI plugin", "name", name, "type", ptype)
mgr, ok := instances[name]
if !ok {
c.logger.Debug("detected new CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID)
mgr := newInstanceManager(c.logger, c.eventer, c.updateNodeCSIInfoFunc, plugin)
instances[name] = mgr
mgr.run()
} else if mgr.allocID != plugin.AllocID {
mgr.shutdown()
c.logger.Debug("detected update for CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID)
mgr := newInstanceManager(c.logger, c.eventer, c.updateNodeCSIInfoFunc, plugin)
instances[name] = mgr
mgr.run()

}
}

Expand All @@ -182,9 +190,11 @@ func (c *csiManager) ensureNoInstance(plugin *dynamicplugins.PluginInfo) {
ptype := plugin.Type
instances := c.instancesForType(ptype)
if mgr, ok := instances[name]; ok {
c.logger.Debug("shutting down CSI plugin", "name", name, "type", ptype)
mgr.shutdown()
delete(instances, name)
if mgr.allocID == plugin.AllocID {
c.logger.Debug("shutting down CSI plugin", "name", name, "type", ptype, "alloc", plugin.AllocID)
mgr.shutdown()
delete(instances, name)
}
}
}

Expand Down
Loading

0 comments on commit 5c73101

Please sign in to comment.