Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: fix transaction handling in state store #9438

Merged
merged 8 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,19 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
snap, err := state.Snapshot()
if err != nil {
return err
}
// Query all volumes
var err error
var iter memdb.ResultIterator

if args.NodeID != "" {
iter, err = state.CSIVolumesByNodeID(ws, args.NodeID)
iter, err = snap.CSIVolumesByNodeID(ws, args.NodeID)
} else if args.PluginID != "" {
iter, err = state.CSIVolumesByPluginID(ws, ns, args.PluginID)
iter, err = snap.CSIVolumesByPluginID(ws, ns, args.PluginID)
} else {
iter, err = state.CSIVolumesByNamespace(ws, ns)
iter, err = snap.CSIVolumesByNamespace(ws, ns)
}

if err != nil {
Expand All @@ -140,23 +143,25 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
if raw == nil {
break
}

vol := raw.(*structs.CSIVolume)
vol, err := state.CSIVolumeDenormalizePlugins(ws, vol.Copy())
if err != nil {
return err
}

// Remove (possibly again) by PluginID to handle passing both NodeID and PluginID
// Remove (possibly again) by PluginID to handle passing both
// NodeID and PluginID
if args.PluginID != "" && args.PluginID != vol.PluginID {
continue
}

// Remove by Namespace, since CSIVolumesByNodeID hasn't used the Namespace yet
// Remove by Namespace, since CSIVolumesByNodeID hasn't used
// the Namespace yet
if vol.Namespace != ns {
continue
}

vol, err := snap.CSIVolumeDenormalizePlugins(ws, vol.Copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the significance of copying the volume here, but not during Get?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CSIVolumeByID query in Get does a CSIVolumeDenormalizePlugins in the state store, so we already copy in the state store function. (I'm not wild about how that works currently because it's too easy to miss... I didn't want to get too wild with the refactoring in this PR though.)

if err != nil {
return err
}

vs = append(vs, vol.Stub())
}
reply.Volumes = vs
Expand Down Expand Up @@ -195,12 +200,17 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
vol, err := state.CSIVolumeByID(ws, ns, args.ID)
snap, err := state.Snapshot()
if err != nil {
return err
}

vol, err := snap.CSIVolumeByID(ws, ns, args.ID)
if err != nil {
return err
}
if vol != nil {
vol, err = state.CSIVolumeDenormalize(ws, vol)
vol, err = snap.CSIVolumeDenormalize(ws, vol)
}
if err != nil {
return err
Expand All @@ -214,9 +224,8 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol

func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, vol *structs.CSIVolume) (*structs.CSIPlugin, error) {
state := v.srv.fsm.State()
ws := memdb.NewWatchSet()

plugin, err := state.CSIPluginByID(ws, vol.PluginID)
plugin, err := state.CSIPluginByID(nil, vol.PluginID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -481,9 +490,7 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest,

func (v *CSIVolume) volAndPluginLookup(namespace, volID string) (*structs.CSIPlugin, *structs.CSIVolume, error) {
state := v.srv.fsm.State()
ws := memdb.NewWatchSet()

vol, err := state.CSIVolumeByID(ws, namespace, volID)
vol, err := state.CSIVolumeByID(nil, namespace, volID)
if err != nil {
return nil, nil, err
}
Expand All @@ -497,7 +504,7 @@ func (v *CSIVolume) volAndPluginLookup(namespace, volID string) (*structs.CSIPlu
// note: we do this same lookup in CSIVolumeByID but then throw
// away the pointer to the plugin rather than attaching it to
// the volume so we have to do it again here.
plug, err := state.CSIPluginByID(ws, vol.PluginID)
plug, err := state.CSIPluginByID(nil, vol.PluginID)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -870,7 +877,12 @@ func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPlu
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
plug, err := state.CSIPluginByID(ws, args.ID)
snap, err := state.Snapshot()
if err != nil {
return err
}

plug, err := snap.CSIPluginByID(ws, args.ID)
if err != nil {
return err
}
Expand All @@ -880,7 +892,7 @@ func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPlu
}

if withAllocs {
plug, err = state.CSIPluginDenormalize(ws, plug.Copy())
plug, err = snap.CSIPluginDenormalize(ws, plug.Copy())
if err != nil {
return err
}
Expand Down
Loading