Skip to content

Commit

Permalink
CSI: ensure we're using new snapshots after checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Sep 23, 2022
1 parent 9c3ea13 commit 87d22e7
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,14 +693,18 @@ RELEASE_CLAIM:
func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
v.logger.Trace("node unpublish", "vol", vol.ID)

store := v.srv.fsm.State()
// We need a new snapshot after each checkpoint
snap, err := v.srv.fsm.State().Snapshot()
if err != nil {
return err
}

// If the node has been GC'd or is down, we can't send it a node
// unpublish. We need to assume the node has unpublished at its
// end. If it hasn't, any controller unpublish will potentially
// hang or error and need to be retried.
if claim.NodeID != "" {
node, err := store.NodeByID(memdb.NewWatchSet(), claim.NodeID)
node, err := snap.NodeByID(memdb.NewWatchSet(), claim.NodeID)
if err != nil {
return err
}
Expand All @@ -723,7 +727,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
// The RPC sent from the 'nomad node detach' command or GC won't have an
// allocation ID set so we try to unpublish every terminal or invalid
// alloc on the node, all of which will be in PastClaims after denormalizing
vol, err := store.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
vol, err = snap.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
if err != nil {
return err
}
Expand Down Expand Up @@ -793,10 +797,15 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
return nil
}

state := v.srv.fsm.State()
// We need a new snapshot after each checkpoint
snap, err := v.srv.fsm.State().Snapshot()
if err != nil {
return err
}

ws := memdb.NewWatchSet()

plugin, err := state.CSIPluginByID(ws, vol.PluginID)
plugin, err := snap.CSIPluginByID(ws, vol.PluginID)
if err != nil {
return fmt.Errorf("could not query plugin: %v", err)
} else if plugin == nil {
Expand All @@ -808,7 +817,7 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
return nil
}

vol, err = state.CSIVolumeDenormalize(ws, vol)
vol, err = snap.CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}
Expand Down

0 comments on commit 87d22e7

Please sign in to comment.