Skip to content

Commit

Permalink
address some PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gulducat committed Sep 7, 2023
1 parent 1951bac commit 713b270
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 19 deletions.
2 changes: 1 addition & 1 deletion client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (c *CSI) ControllerExpandVolume(req *structs.ClientCSIControllerExpandVolum
}
resp.CapacityBytes = cresp.CapacityBytes
resp.NodeExpansionRequired = cresp.NodeExpansionRequired
return err
return nil
}

func (c *CSI) ControllerDeleteVolume(req *structs.ClientCSIControllerDeleteVolumeRequest, resp *structs.ClientCSIControllerDeleteVolumeResponse) error {
Expand Down
30 changes: 14 additions & 16 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,22 +382,22 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
return nil
}

// reconcileVolume updates the "old" volume with many of the contents of "new"
// reconcileVolume updates a volume with many of the contents of another.
// It may or may not do extra work to actually expand a volume outside of Nomad,
// depending on whether requested capacity values have changed.
func (v *CSIVolume) reconcileVolume(plugin *structs.CSIPlugin, old *structs.CSIVolume, new *structs.CSIVolume) error {
func (v *CSIVolume) reconcileVolume(plugin *structs.CSIPlugin, vol *structs.CSIVolume, update *structs.CSIVolume) error {
// Merge does some validation, before we attempt any potential CSI RPCs,
// and mutates `old` with (most of) the values of `new`
err := old.Merge(new) // this skips capacity values
// and mutates `vol` with (most of) the values of `update`,
// notably excluding capacity values, which are covered below.
err := vol.Merge(update)
if err != nil {
return err
}
// expandVolume will mutate `old` with new capacity-related values, if needed.
err = v.expandVolume(old, plugin, &csi.CapacityRange{
RequiredBytes: new.RequestedCapacityMin,
LimitBytes: new.RequestedCapacityMax,
return v.expandVolume(vol, plugin, &csi.CapacityRange{
RequiredBytes: update.RequestedCapacityMin,
LimitBytes: update.RequestedCapacityMax,
})
return err
}

// Deregister removes a set of volumes
Expand Down Expand Up @@ -1059,8 +1059,6 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
}
validatedVols := []validated{}

ws := memdb.NewWatchSet()

// This is the only namespace we ACL checked, force all the volumes to use it.
// We also validate that the plugin exists for each plugin, and validate the
// capabilities when the plugin has a controller.
Expand Down Expand Up @@ -1088,7 +1086,7 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
return err
}
// current will be nil if it does not exist.
current, err := snap.CSIVolumeByID(ws, vol.Namespace, vol.ID)
current, err := snap.CSIVolumeByID(nil, vol.Namespace, vol.ID)
if err != nil {
return err
}
Expand Down Expand Up @@ -1117,7 +1115,7 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
cp := valid.current.Copy()
err = v.reconcileVolume(valid.plugin, cp, valid.vol)
if err != nil {
multierror.Append(&mErr, err)
mErr.Errors = append(mErr.Errors, err)
} else {
// we merged valid.vol into cp, so update state with the copy
regArgs.Volumes = append(regArgs.Volumes, cp)
Expand All @@ -1126,7 +1124,7 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
} else {
err = v.createVolume(valid.vol, valid.plugin)
if err != nil {
multierror.Append(&mErr, err)
mErr.Errors = append(mErr.Errors, err)
} else {
regArgs.Volumes = append(regArgs.Volumes, valid.vol)
}
Expand All @@ -1138,7 +1136,7 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
_, index, err = v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
multierror.Append(&mErr, err)
mErr.Errors = append(mErr.Errors, err)
}
}

Expand Down Expand Up @@ -1268,7 +1266,7 @@ func (v *CSIVolume) expandVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerExpandVolumeResponse{}

logger().Info("expanding volume")
logger().Debug("expanding volume")
// This is the real work. The client RPC sends a gRPC to the controller plugin,
// then that controller may reach out to cloud APIs, etc.
err = v.serializedControllerRPC(plugin.ID, func() error {
Expand All @@ -1279,7 +1277,7 @@ func (v *CSIVolume) expandVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug
}

vol.Capacity = cResp.CapacityBytes
logger().Info("controller done expanding")
logger().Debug("controller done expanding")

if cResp.NodeExpansionRequired {
v.logger.Warn("TODO: also do node volume expansion if needed") // TODO
Expand Down
4 changes: 2 additions & 2 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ package structs
import (
"errors"
"fmt"
"maps"
"slices"
"strings"
"time"

multierror "github.com/hashicorp/go-multierror"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"

"github.com/hashicorp/nomad/helper"
)
Expand Down

0 comments on commit 713b270

Please sign in to comment.