From 2339cadf9a20a9c17f4f9b536e183a665ff13902 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 30 Oct 2020 15:24:44 -0400 Subject: [PATCH 1/4] csi: failing test case demonstrating buggy CSIVolume.Copy --- nomad/structs/csi_test.go | 86 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index d5f63b2413d1..b1c2ceda506c 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -1,7 +1,9 @@ package structs import ( + "reflect" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -43,6 +45,90 @@ func TestCSIVolumeClaim(t *testing.T) { require.True(t, vol.WriteFreeClaims()) } +func TestVolume_Copy(t *testing.T) { + + a1 := MockAlloc() + a2 := MockAlloc() + a3 := MockAlloc() + c1 := &CSIVolumeClaim{ + AllocationID: a1.ID, + NodeID: a1.NodeID, + ExternalNodeID: "c1", + Mode: CSIVolumeClaimRead, + State: CSIVolumeClaimStateTaken, + } + c2 := &CSIVolumeClaim{ + AllocationID: a2.ID, + NodeID: a2.NodeID, + ExternalNodeID: "c2", + Mode: CSIVolumeClaimRead, + State: CSIVolumeClaimStateNodeDetached, + } + c3 := &CSIVolumeClaim{ + AllocationID: a3.ID, + NodeID: a3.NodeID, + ExternalNodeID: "c3", + Mode: CSIVolumeClaimWrite, + State: CSIVolumeClaimStateTaken, + } + + v1 := &CSIVolume{ + ID: "vol1", + Name: "vol1", + ExternalID: "vol-abcdef", + Namespace: "default", + Topologies: []*CSITopology{{Segments: map[string]string{"AZ1": "123"}}}, + AccessMode: CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: CSIVolumeAttachmentModeBlockDevice, + MountOptions: &CSIMountOptions{FSType: "ext4", MountFlags: []string{"ro", "noatime"}}, + Secrets: CSISecrets{"mysecret": "myvalue"}, + Parameters: map[string]string{"param1": "val1"}, + Context: map[string]string{"ctx1": "val1"}, + + ReadAllocs: map[string]*Allocation{a1.ID: a1, a2.ID: nil}, + WriteAllocs: map[string]*Allocation{a3.ID: a3}, + + ReadClaims: map[string]*CSIVolumeClaim{a1.ID: c1, a2.ID: c2}, + WriteClaims: map[string]*CSIVolumeClaim{a3.ID: c3}, + PastClaims: map[string]*CSIVolumeClaim{}, + + Schedulable: true, + PluginID: "moosefs", + Provider: "n/a", + ProviderVersion: "1.0", + ControllerRequired: true, + ControllersHealthy: 2, + ControllersExpected: 2, + NodesHealthy: 4, + NodesExpected: 5, + ResourceExhausted: time.Now(), + } + + v2 := v1.Copy() + if !reflect.DeepEqual(v1, v2) { + t.Fatalf("Copy() returned an unequal Volume; got %#v; want %#v", v1, v2) + } + + v1.ReadClaims[a1.ID].State = CSIVolumeClaimStateReadyToFree + v1.ReadAllocs[a2.ID] = a2 + v1.WriteAllocs[a3.ID].ClientStatus = AllocClientStatusComplete + v1.MountOptions.FSType = "zfs" + + if v2.ReadClaims[a1.ID].State == CSIVolumeClaimStateReadyToFree { + t.Fatalf("Volume.Copy() failed; changes to original ReadClaims seen in copy") + } + if v2.ReadAllocs[a2.ID] != nil { + t.Fatalf("Volume.Copy() failed; changes to original ReadAllocs seen in copy") + } + if v2.WriteAllocs[a3.ID].ClientStatus == AllocClientStatusComplete { + t.Fatalf("Volume.Copy() failed; changes to original WriteAllocs seen in copy") + } + if v2.MountOptions.FSType == "zfs" { + t.Fatalf("Volume.Copy() failed; changes to original MountOptions seen in copy") + } + +} + func TestCSIPluginJobs(t *testing.T) { plug := NewCSIPlugin("foo", 1000) controller := &Job{ From dfdbc88c23eb543f91b3c43d3a30a8c7a8085a4a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 30 Oct 2020 15:29:16 -0400 Subject: [PATCH 2/4] csi: minimal change to make test case pass --- nomad/structs/csi.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 79d3c9cab019..c1035a04a43d 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -315,6 +315,9 @@ func (v *CSIVolume) newStructs() { if v.Parameters == nil { v.Parameters = map[string]string{} } + if v.MountOptions == nil { + v.MountOptions = &CSIMountOptions{} + } if v.Secrets == nil { v.Secrets = CSISecrets{} } @@ -417,13 +420,26 @@ func (v *CSIVolume) Copy() *CSIVolume { for k, v := range v.Secrets { out.Secrets[k] = v } + mo := *v.MountOptions + out.MountOptions = &mo for k, v := range v.ReadAllocs { - out.ReadAllocs[k] = v + if v != nil { + a := *v + out.ReadAllocs[k] = &a + } else { + out.ReadAllocs[k] = nil + } } for k, v := range v.WriteAllocs { - out.WriteAllocs[k] = v + if v != nil { + a := *v + out.WriteAllocs[k] = &a + } else { + out.WriteAllocs[k] = nil + } + } for k, v := range v.ReadClaims { From 6c408e5ea2352cd1f0ae28a1298e66e0eff96e8a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 30 Oct 2020 16:34:31 -0400 Subject: [PATCH 3/4] csi: refactor CSIVolume Copy for clarity --- nomad/structs/csi.go | 78 +++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 44 deletions(-) diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index c1035a04a43d..c0f9dc4df3d3 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -306,25 +306,14 @@ func NewCSIVolume(volumeID string, index uint64) *CSIVolume { } func (v *CSIVolume) newStructs() { - if v.Topologies == nil { - v.Topologies = []*CSITopology{} - } - if v.Context == nil { - v.Context = map[string]string{} - } - if v.Parameters == nil { - v.Parameters = map[string]string{} - } - if v.MountOptions == nil { - v.MountOptions = &CSIMountOptions{} - } - if v.Secrets == nil { - v.Secrets = CSISecrets{} - } + v.Topologies = []*CSITopology{} + v.MountOptions = new(CSIMountOptions) + v.Secrets = CSISecrets{} + v.Parameters = map[string]string{} + v.Context = map[string]string{} v.ReadAllocs = map[string]*Allocation{} v.WriteAllocs = map[string]*Allocation{} - v.ReadClaims = map[string]*CSIVolumeClaim{} v.WriteClaims = map[string]*CSIVolumeClaim{} v.PastClaims = map[string]*CSIVolumeClaim{} @@ -408,38 +397,31 @@ func (v *CSIVolume) InUse() bool { // Copy returns a copy of the volume, which shares only the Topologies slice func (v *CSIVolume) Copy() *CSIVolume { - copy := *v - out := © - out.newStructs() + out := new(CSIVolume) + *out = *v + out.newStructs() // zero-out the non-primitive structs + + for _, t := range v.Topologies { + out.Topologies = append(out.Topologies, t.Copy()) + } + if v.MountOptions != nil { + *out.MountOptions = *v.MountOptions + } + for k, v := range v.Secrets { + out.Secrets[k] = v + } for k, v := range v.Parameters { out.Parameters[k] = v } for k, v := range v.Context { out.Context[k] = v } - for k, v := range v.Secrets { - out.Secrets[k] = v - } - mo := *v.MountOptions - out.MountOptions = &mo - for k, v := range v.ReadAllocs { - if v != nil { - a := *v - out.ReadAllocs[k] = &a - } else { - out.ReadAllocs[k] = nil - } + for k, alloc := range v.ReadAllocs { + out.ReadAllocs[k] = alloc.Copy() } - - for k, v := range v.WriteAllocs { - if v != nil { - a := *v - out.WriteAllocs[k] = &a - } else { - out.WriteAllocs[k] = nil - } - + for k, alloc := range v.WriteAllocs { + out.WriteAllocs[k] = alloc.Copy() } for k, v := range v.ReadClaims { @@ -791,19 +773,19 @@ func (p *CSIPlugin) Copy() *CSIPlugin { out.newStructs() for k, v := range p.Controllers { - out.Controllers[k] = v + out.Controllers[k] = v.Copy() } for k, v := range p.Nodes { - out.Nodes[k] = v + out.Nodes[k] = v.Copy() } for k, v := range p.ControllerJobs { - out.ControllerJobs[k] = v + out.ControllerJobs[k] = v.Copy() } for k, v := range p.NodeJobs { - out.NodeJobs[k] = v + out.NodeJobs[k] = v.Copy() } return out @@ -1005,6 +987,14 @@ type JobDescription struct { // JobNamespacedDescriptions maps Job.ID to JobDescription type JobNamespacedDescriptions map[string]JobDescription +func (j JobNamespacedDescriptions) Copy() JobNamespacedDescriptions { + copy := JobNamespacedDescriptions{} + for k, v := range j { + copy[k] = v + } + return copy +} + // JobDescriptions maps Namespace to a mapping of Job.ID to JobDescription type JobDescriptions map[string]JobNamespacedDescriptions From 4de0f79fa3106843d93698365c2da361ee0e06cb Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 9 Nov 2020 08:58:06 -0500 Subject: [PATCH 4/4] csi: clear alloc pointers from volume before writing to state store --- nomad/csi_endpoint_test.go | 4 ++++ nomad/state/state_store.go | 22 ++++++++++++++++++++++ nomad/structs/csi.go | 4 ++-- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index ce28f5eee22b..ff91f0da6d8f 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -561,6 +561,10 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { if tc.expectedErrMsg == "" { require.NoError(t, err) + vol, err = state.CSIVolumeByID(nil, ns, volID) + require.NoError(t, err) + require.NotNil(t, vol) + require.Len(t, vol.ReadAllocs, 0) } else { require.Error(t, err) require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg), diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 87c453ba8554..977eec5bfbc2 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2072,6 +2072,17 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum v.ModifyIndex = index } + // Allocations are copy on write, so we want to keep the Allocation ID + // but we need to clear the pointer so that we don't store it when we + // write the volume to the state store. We'll get it from the db in + // denormalize. + for allocID := range v.ReadAllocs { + v.ReadAllocs[allocID] = nil + } + for allocID := range v.WriteAllocs { + v.WriteAllocs[allocID] = nil + } + err = txn.Insert("csi_volumes", v) if err != nil { return fmt.Errorf("volume insert: %v", err) @@ -2263,6 +2274,17 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s volume.ModifyIndex = index + // Allocations are copy on write, so we want to keep the Allocation ID + // but we need to clear the pointer so that we don't store it when we + // write the volume to the state store. We'll get it from the db in + // denormalize. + for allocID := range volume.ReadAllocs { + volume.ReadAllocs[allocID] = nil + } + for allocID := range volume.WriteAllocs { + volume.WriteAllocs[allocID] = nil + } + if err = txn.Insert("csi_volumes", volume); err != nil { return fmt.Errorf("volume update failed: %s: %v", id, err) } diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index c0f9dc4df3d3..ade9181fbeb6 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -378,7 +378,7 @@ func (v *CSIVolume) WriteSchedulable() bool { func (v *CSIVolume) WriteFreeClaims() bool { switch v.AccessMode { case CSIVolumeAccessModeSingleNodeWriter, CSIVolumeAccessModeMultiNodeSingleWriter: - return len(v.WriteAllocs) == 0 + return len(v.WriteClaims) == 0 case CSIVolumeAccessModeMultiNodeMultiWriter: // the CSI spec doesn't allow for setting a max number of writers. // we track node resource exhaustion through v.ResourceExhausted @@ -496,7 +496,7 @@ func (v *CSIVolume) ClaimWrite(claim *CSIVolumeClaim, alloc *Allocation) error { if !v.WriteFreeClaims() { // Check the blocking allocations to see if they belong to this job for _, a := range v.WriteAllocs { - if a.Namespace != alloc.Namespace || a.JobID != alloc.JobID { + if a != nil && (a.Namespace != alloc.Namespace || a.JobID != alloc.JobID) { return fmt.Errorf("volume max claim reached") } }