Skip to content

Commit

Permalink
csi: clear alloc pointers from volume before writing to state store
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Nov 18, 2020
1 parent 6c408e5 commit 4de0f79
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
4 changes: 4 additions & 0 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,10 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {

if tc.expectedErrMsg == "" {
require.NoError(t, err)
vol, err = state.CSIVolumeByID(nil, ns, volID)
require.NoError(t, err)
require.NotNil(t, vol)
require.Len(t, vol.ReadAllocs, 0)
} else {
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg),
Expand Down
22 changes: 22 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,17 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
v.ModifyIndex = index
}

// Allocations are copy on write, so we want to keep the Allocation ID
// but we need to clear the pointer so that we don't store it when we
// write the volume to the state store. We'll get it from the db in
// denormalize.
for allocID := range v.ReadAllocs {
v.ReadAllocs[allocID] = nil
}
for allocID := range v.WriteAllocs {
v.WriteAllocs[allocID] = nil
}

err = txn.Insert("csi_volumes", v)
if err != nil {
return fmt.Errorf("volume insert: %v", err)
Expand Down Expand Up @@ -2263,6 +2274,17 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s

volume.ModifyIndex = index

// Allocations are copy on write, so we want to keep the Allocation ID
// but we need to clear the pointer so that we don't store it when we
// write the volume to the state store. We'll get it from the db in
// denormalize.
for allocID := range volume.ReadAllocs {
volume.ReadAllocs[allocID] = nil
}
for allocID := range volume.WriteAllocs {
volume.WriteAllocs[allocID] = nil
}

if err = txn.Insert("csi_volumes", volume); err != nil {
return fmt.Errorf("volume update failed: %s: %v", id, err)
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (v *CSIVolume) WriteSchedulable() bool {
func (v *CSIVolume) WriteFreeClaims() bool {
switch v.AccessMode {
case CSIVolumeAccessModeSingleNodeWriter, CSIVolumeAccessModeMultiNodeSingleWriter:
return len(v.WriteAllocs) == 0
return len(v.WriteClaims) == 0
case CSIVolumeAccessModeMultiNodeMultiWriter:
// the CSI spec doesn't allow for setting a max number of writers.
// we track node resource exhaustion through v.ResourceExhausted
Expand Down Expand Up @@ -496,7 +496,7 @@ func (v *CSIVolume) ClaimWrite(claim *CSIVolumeClaim, alloc *Allocation) error {
if !v.WriteFreeClaims() {
// Check the blocking allocations to see if they belong to this job
for _, a := range v.WriteAllocs {
if a.Namespace != alloc.Namespace || a.JobID != alloc.JobID {
if a != nil && (a.Namespace != alloc.Namespace || a.JobID != alloc.JobID) {
return fmt.Errorf("volume max claim reached")
}
}
Expand Down

0 comments on commit 4de0f79

Please sign in to comment.