Skip to content

Commit

Permalink
csi: fix prefix queries for plugin list RPC
Browse files Browse the repository at this point in the history
The `CSIPlugin.List` RPC was intended to accept a prefix to filter the
list of plugins being listed. This was being accidentally being done
in the state store instead, which contributed to incorrect filtering
behavior for plugins in the `volume snapshot list` command.

Move the prefix matching into the RPC so that it calls the
prefix-matching method in the state store if we're looking for a
prefix.

Update the `plugin status command` to accept a prefix for the plugin
ID argument so that it matches the expected behavior of other commands.
  • Loading branch information
tgross committed Mar 4, 2022
1 parent e3001f6 commit 91919fc
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 6 deletions.
23 changes: 23 additions & 0 deletions command/plugin_status_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,29 @@ func (c *PluginStatusCommand) csiStatus(client *api.Client, id string) int {
return 0
}

// filter by plugin if a plugin ID was passed
plugs, _, err := client.CSIPlugins().List(&api.QueryOptions{Prefix: id})
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying CSI plugins: %s", err))
return 1
}
if len(plugs) == 0 {
c.Ui.Error(fmt.Sprintf("No plugins(s) with prefix or ID %q found", id))
return 1
}
if len(plugs) > 1 {
if id != plugs[0].ID {
out, err := c.csiFormatPlugins(plugs)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Error(fmt.Sprintf("Prefix matched multiple plugins\n\n%s", out))
return 1
}
}
id = plugs[0].ID

// Lookup matched a single plugin
plug, _, err := client.CSIPlugins().Info(id, nil)
if err != nil {
Expand Down
18 changes: 14 additions & 4 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,10 +1294,20 @@ func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIP
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Query all plugins
iter, err := state.CSIPlugins(ws)
if err != nil {
return err

var iter memdb.ResultIterator
var err error
if args.Prefix != "" {
iter, err = state.CSIPluginsByIDPrefix(ws, args.Prefix)
if err != nil {
return err
}
} else {
// Query all plugins
iter, err = state.CSIPlugins(ws)
if err != nil {
return err
}
}

// Collect results
Expand Down
3 changes: 2 additions & 1 deletion nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ func TestCSIVolumeEndpoint_ListExternal(t *testing.T) {
// List external volumes; note that none of these exist in the state store

req := &structs.CSIVolumeExternalListRequest{
PluginID: "minnie",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
Expand Down Expand Up @@ -1371,8 +1372,8 @@ func TestCSIVolumeEndpoint_ListSnapshots(t *testing.T) {
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))

// List snapshots

req := &structs.CSISnapshotListRequest{
PluginID: "minnie",
Secrets: structs.CSISecrets{
"secret-key-1": "secret-val-1",
},
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2696,7 +2696,7 @@ func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPl
// CSIPluginByIDTxn returns a named CSIPlugin
func (s *StateStore) CSIPluginByIDTxn(txn Txn, ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) {

watchCh, obj, err := txn.FirstWatch("csi_plugins", "id_prefix", id)
watchCh, obj, err := txn.FirstWatch("csi_plugins", "id", id)
if err != nil {
return nil, fmt.Errorf("csi_plugin lookup failed: %s %v", id, err)
}
Expand Down

0 comments on commit 91919fc

Please sign in to comment.