Skip to content

Commit

Permalink
CSI: tests to exercise csi_hook (#11788)
Browse files Browse the repository at this point in the history
Small refactoring of the allocrunner hook for CSI to make it more
testable, and a unit test that covers most of its logic.
  • Loading branch information
tgross committed Jan 28, 2022
1 parent 9154033 commit 37dea7c
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 24 deletions.
2 changes: 1 addition & 1 deletion client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
}),
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newCSIHook(ar, hookLogger, alloc, ar.rpcClient, ar.csiManager, hrs),
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, hrs, ar.clientConfig.Node.SecretID),
}

return nil
Expand Down
55 changes: 32 additions & 23 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,35 @@ import (
//
// It is a noop for allocs that do not depend on CSI Volumes.
type csiHook struct {
ar *allocRunner
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager
rpcClient RPCer
updater hookResourceSetter
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager
rpcClient RPCer
taskCapabilityGetter taskCapabilityGetter
updater hookResourceSetter
nodeSecret string

volumeRequests map[string]*volumeAndRequest
}

// implemented by allocrunner
type taskCapabilityGetter interface {
GetTaskDriverCapabilities(string) (*drivers.Capabilities, error)
}

func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, updater hookResourceSetter, nodeSecret string) *csiHook {
return &csiHook{
alloc: alloc,
logger: logger.Named("csi_hook"),
csimanager: csi,
rpcClient: rpcClient,
taskCapabilityGetter: taskCapabilityGetter,
updater: updater,
nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{},
}
}

func (c *csiHook) Name() string {
return "csi_hook"
}
Expand Down Expand Up @@ -103,7 +123,7 @@ func (c *csiHook) Postrun() error {
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.ar.clientConfig.Node.SecretID,
AuthToken: c.nodeSecret,
},
}
err := c.rpcClient.RPC("CSIVolume.Unpublish",
Expand Down Expand Up @@ -136,7 +156,7 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
if volumeRequest.Type == structs.VolumeTypeCSI {

for _, task := range tg.Tasks {
caps, err := c.ar.GetTaskDriverCapabilities(task.Name)
caps, err := c.taskCapabilityGetter.GetTaskDriverCapabilities(task.Name)
if err != nil {
return nil, fmt.Errorf("could not validate task driver capabilities: %v", err)
}
Expand Down Expand Up @@ -167,13 +187,13 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.ar.clientConfig.Node.SecretID,
AuthToken: c.nodeSecret,
},
}

var resp structs.CSIVolumeClaimResponse
if err := c.rpcClient.RPC("CSIVolume.Claim", req, &resp); err != nil {
return nil, err
return nil, fmt.Errorf("could not claim volume %s: %w", req.VolumeID, err)
}

if resp.Volume == nil {
Expand All @@ -187,19 +207,8 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
return result, nil
}

func newCSIHook(ar *allocRunner, logger hclog.Logger, alloc *structs.Allocation, rpcClient RPCer, csi csimanager.Manager, updater hookResourceSetter) *csiHook {
return &csiHook{
ar: ar,
alloc: alloc,
logger: logger.Named("csi_hook"),
rpcClient: rpcClient,
csimanager: csi,
updater: updater,
}
}

func (h *csiHook) shouldRun() bool {
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
func (c *csiHook) shouldRun() bool {
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
return true
Expand Down
234 changes: 234 additions & 0 deletions client/allocrunner/csi_hook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package allocrunner

import (
"context"
"fmt"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/pluginmanager"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)

var _ interfaces.RunnerPrerunHook = (*csiHook)(nil)
var _ interfaces.RunnerPostrunHook = (*csiHook)(nil)

// TODO https://github.com/hashicorp/nomad/issues/11786
// we should implement Update as well
// var _ interfaces.RunnerUpdateHook = (*csiHook)(nil)

func TestCSIHook(t *testing.T) {

alloc := mock.Alloc()
logger := testlog.HCLogger(t)

testcases := []struct {
name string
volumeRequests map[string]*structs.VolumeRequest
expectedMounts map[string]*csimanager.MountInfo
expectedMountCalls int
expectedUnmountCalls int
expectedClaimCalls int
expectedUnpublishCalls int
}{

{
name: "simple case",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
ReadOnly: true,
MountOptions: &structs.CSIMountOptions{},
},
},
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 0, // not until this is done client-side
expectedClaimCalls: 1,
expectedUnpublishCalls: 1,
},

{
name: "per-alloc case",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
ReadOnly: true,
MountOptions: &structs.CSIMountOptions{},
},
},
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 0, // not until this is done client-side
expectedClaimCalls: 1,
expectedUnpublishCalls: 1,
},

// TODO: this won't actually work on the client.
// https://github.com/hashicorp/nomad/issues/11798
//
// {
// name: "one source volume mounted read-only twice",
// volumeRequests: map[string]*structs.VolumeRequest{
// "vol0": {
// Name: "vol0",
// Type: structs.VolumeTypeCSI,
// Source: "testvolume0",
// ReadOnly: true,
// MountOptions: &structs.CSIMountOptions{},
// },
// "vol1": {
// Name: "vol1",
// Type: structs.VolumeTypeCSI,
// Source: "testvolume0",
// ReadOnly: false,
// MountOptions: &structs.CSIMountOptions{},
// },
// },
// expectedMounts: map[string]*csimanager.MountInfo{
// "vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
// "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)},
// "vol1": &csimanager.MountInfo{Source: fmt.Sprintf(
// "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)},
// },
// expectedMountCalls: 1,
// expectedUnmountCalls: 0, // not until this is done client-side
// expectedClaimCalls: 1,
// expectedUnpublishCalls: 1,
// },
}

for i := range testcases {
tc := testcases[i]
t.Run(tc.name, func(t *testing.T) {
alloc.Job.TaskGroups[0].Volumes = tc.volumeRequests

callCounts := map[string]int{}
mgr := mockPluginManager{mounter: mockVolumeMounter{callCounts: callCounts}}
rpcer := mockRPCer{alloc: alloc, callCounts: callCounts}
ar := mockAllocRunner{
res: &cstructs.AllocHookResources{},
caps: &drivers.Capabilities{
FSIsolation: drivers.FSIsolationChroot,
MountConfigs: drivers.MountConfigSupportAll,
},
}
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
require.NotNil(t, hook)

require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.NotNil(t, mounts)
require.Equal(t, tc.expectedMounts, mounts)

require.NoError(t, hook.Postrun())
require.Equal(t, tc.expectedMountCalls, callCounts["mount"])
require.Equal(t, tc.expectedUnmountCalls, callCounts["unmount"])
require.Equal(t, tc.expectedClaimCalls, callCounts["claim"])
require.Equal(t, tc.expectedUnpublishCalls, callCounts["unpublish"])

})
}

}

// HELPERS AND MOCKS

func testVolume(id string) *structs.CSIVolume {
vol := structs.NewCSIVolume(id, 0)
vol.Schedulable = true
return vol
}

type mockRPCer struct {
alloc *structs.Allocation
callCounts map[string]int
}

// RPC mocks the server RPCs, acting as though any request succeeds
func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error {
switch method {
case "CSIVolume.Claim":
r.callCounts["claim"]++
req := args.(*structs.CSIVolumeClaimRequest)
vol := testVolume(req.VolumeID)
err := vol.Claim(req.ToClaim(), r.alloc)
if err != nil {
return err
}

resp := reply.(*structs.CSIVolumeClaimResponse)
resp.PublishContext = map[string]string{}
resp.Volume = vol
resp.QueryMeta = structs.QueryMeta{}
case "CSIVolume.Unpublish":
r.callCounts["unpublish"]++
resp := reply.(*structs.CSIVolumeUnpublishResponse)
resp.QueryMeta = structs.QueryMeta{}
default:
return fmt.Errorf("unexpected method")
}
return nil
}

type mockVolumeMounter struct {
callCounts map[string]int
}

func (vm mockVolumeMounter) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *csimanager.UsageOptions, publishContext map[string]string) (*csimanager.MountInfo, error) {
vm.callCounts["mount"]++
return &csimanager.MountInfo{
Source: filepath.Join("test-alloc-dir", alloc.ID, vol.ID, usageOpts.ToFS()),
}, nil
}
func (vm mockVolumeMounter) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *csimanager.UsageOptions) error {
vm.callCounts["unmount"]++
return nil
}

type mockPluginManager struct {
mounter mockVolumeMounter
}

func (mgr mockPluginManager) MounterForPlugin(ctx context.Context, pluginID string) (csimanager.VolumeMounter, error) {
return mgr.mounter, nil
}

// no-op methods to fulfill the interface
func (mgr mockPluginManager) PluginManager() pluginmanager.PluginManager { return nil }
func (mgr mockPluginManager) Shutdown() {}

type mockAllocRunner struct {
res *cstructs.AllocHookResources
caps *drivers.Capabilities
}

func (ar mockAllocRunner) GetAllocHookResources() *cstructs.AllocHookResources {
return ar.res
}

func (ar mockAllocRunner) SetAllocHookResources(res *cstructs.AllocHookResources) {
ar.res = res
}

func (ar mockAllocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) {
return ar.caps, nil
}

0 comments on commit 37dea7c

Please sign in to comment.