Skip to content

Commit

Permalink
csi: remove nested read txn from volume write txns
Browse files Browse the repository at this point in the history
When making updates to CSI volumes, the state store methods that have open
write transactions were querying the state store using the same methods used
by the CSI RPC endpoint, but these method creates their own top-level read
transactions. These have yet not been implicated in any bugs.

Refactor the CSIVolume query methods to have an implementation method that
accepts a transaction, which can be called with either a read txn or a write
txn.
  • Loading branch information
tgross committed Nov 24, 2020
1 parent 962cd25 commit 8b88e59
Showing 1 changed file with 32 additions and 16 deletions.
48 changes: 32 additions & 16 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2133,7 +2133,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st
// we return the volume with the plugins denormalized by default,
// because the scheduler needs them for feasibility checking
vol := obj.(*structs.CSIVolume)
return s.CSIVolumeDenormalizePlugins(ws, vol.Copy())
return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy())
}

// CSIVolumes looks up csi_volumes by pluginID. Caller should snapshot if it
Expand Down Expand Up @@ -2261,12 +2261,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s
}
}

volume, err := s.CSIVolumeDenormalizePlugins(ws, orig.Copy())
volume, err := s.CSIVolumeDenormalizePluginsTxn(txn, orig.Copy())
if err != nil {
return err
}

volume, err = s.CSIVolumeDenormalize(ws, volume)
volume, err = s.CSIVolumeDenormalizeTxn(txn, nil, volume)
if err != nil {
return err
}
Expand Down Expand Up @@ -2330,7 +2329,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s
// allocations have been stopped but claims can't be freed because
// ex. the plugins have all been removed.
if vol.InUse() {
if !force || !s.volSafeToForce(vol) {
if !force || !s.volSafeToForce(txn, vol) {
return fmt.Errorf("volume in use: %s", id)
}
}
Expand All @@ -2349,9 +2348,8 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s

// volSafeToForce checks if the any of the remaining allocations
// are in a non-terminal state.
func (s *StateStore) volSafeToForce(v *structs.CSIVolume) bool {
ws := memdb.NewWatchSet()
vol, err := s.CSIVolumeDenormalize(ws, v)
func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool {
vol, err := s.CSIVolumeDenormalizeTxn(txn, nil, v)
if err != nil {
return false
}
Expand All @@ -2369,19 +2367,30 @@ func (s *StateStore) volSafeToForce(v *structs.CSIVolume) bool {
return true
}

// CSIVolumeDenormalizePlugins returns a CSIVolume with current health and plugins, but
// without allocations
// Use this for current volume metadata, handling lists of volumes
// Use CSIVolumeDenormalize for volumes containing both health and current allocations
// CSIVolumeDenormalizePlugins returns a CSIVolume with current health and
// plugins, but without allocations.
// Use this for current volume metadata, handling lists of volumes.
// Use CSIVolumeDenormalize for volumes containing both health and current
// allocations.
func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
if vol == nil {
return nil, nil
}
// Lookup CSIPlugin, the health records, and calculate volume health
txn := s.db.ReadTxn()
defer txn.Abort()
return s.CSIVolumeDenormalizePluginsTxn(txn, vol)
}

plug, err := s.CSIPluginByID(ws, vol.PluginID)
// CSIVolumeDenormalizePluginsTxn returns a CSIVolume with current health and
// plugins, but without allocations.
// Use this for current volume metadata, handling lists of volumes.
// Use CSIVolumeDenormalize for volumes containing both health and current
// allocations.
func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
if vol == nil {
return nil, nil
}
plug, err := s.CSIPluginByIDTxn(txn, nil, vol.PluginID)
if err != nil {
return nil, fmt.Errorf("plugin lookup error: %s %v", vol.PluginID, err)
}
Expand Down Expand Up @@ -2412,8 +2421,15 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs

// CSIVolumeDenormalize returns a CSIVolume with allocations
func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
txn := s.db.ReadTxn()
return s.CSIVolumeDenormalizeTxn(txn, ws, vol)
}

// CSIVolumeDenormalizeTxn populates a CSIVolume with allocations
func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {

for id := range vol.ReadAllocs {
a, err := s.AllocByID(ws, id)
a, err := s.allocByIDImpl(txn, ws, id)
if err != nil {
return nil, err
}
Expand All @@ -2434,7 +2450,7 @@ func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVol
}

for id := range vol.WriteAllocs {
a, err := s.AllocByID(ws, id)
a, err := s.allocByIDImpl(txn, ws, id)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 8b88e59

Please sign in to comment.