Skip to content

Commit

Permalink
CSI: volume and plugin allocations in the API (#8590)
Browse files Browse the repository at this point in the history
* command/agent/csi_endpoint: explicitly convert to API structs, and convert allocs for single object get endpoints
  • Loading branch information
langmartin authored Aug 11, 2020
1 parent 8888ab3 commit 8a095fc
Show file tree
Hide file tree
Showing 4 changed files with 430 additions and 17 deletions.
2 changes: 1 addition & 1 deletion api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ type TaskEvent struct {
Time int64
DisplayMessage string
Details map[string]string
Message string
// DEPRECATION NOTICE: The following fields are all deprecated. see TaskEvent struct in structs.go for details.
FailsTask bool
RestartReason string
Expand All @@ -933,7 +934,6 @@ type TaskEvent struct {
DriverMessage string
ExitCode int
Signal int
Message string
KillReason string
KillTimeout time.Duration
KillError string
Expand Down
321 changes: 310 additions & 11 deletions command/agent/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"strconv"
"strings"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
)

func (s *HTTPServer) CSIVolumesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
switch req.Method {
case http.MethodPut, http.MethodPost:
return s.csiVolumePut(resp, req)
case http.MethodGet:
default:
return nil, CodedError(405, ErrInvalidMethod)
}

Expand Down Expand Up @@ -61,7 +66,7 @@ func (s *HTTPServer) CSIVolumeSpecificRequest(resp http.ResponseWriter, req *htt
case http.MethodGet:
return s.csiVolumeGet(id, resp, req)
case http.MethodPut:
return s.csiVolumePut(id, resp, req)
return s.csiVolumePut(resp, req)
case http.MethodDelete:
return s.csiVolumeDelete(id, resp, req)
default:
Expand Down Expand Up @@ -100,15 +105,20 @@ func (s *HTTPServer) csiVolumeGet(id string, resp http.ResponseWriter, req *http
return nil, CodedError(404, "volume not found")
}

vol := structsCSIVolumeToApi(out.Volume)

// remove sensitive fields, as our redaction mechanism doesn't
// help serializing here
out.Volume.Secrets = nil
out.Volume.MountOptions = nil
return out.Volume, nil
vol.Secrets = nil
vol.MountOptions = nil

return vol, nil
}

func (s *HTTPServer) csiVolumePut(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
func (s *HTTPServer) csiVolumePut(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
switch req.Method {
case http.MethodPost, http.MethodPut:
default:
return nil, CodedError(405, ErrInvalidMethod)
}

Expand All @@ -133,7 +143,7 @@ func (s *HTTPServer) csiVolumePut(id string, resp http.ResponseWriter, req *http
}

func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "DELETE" {
if req.Method != http.MethodDelete {
return nil, CodedError(405, ErrInvalidMethod)
}

Expand Down Expand Up @@ -193,7 +203,7 @@ func (s *HTTPServer) csiVolumeDetach(id string, resp http.ResponseWriter, req *h

// CSIPluginsRequest lists CSI plugins
func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
}

Expand Down Expand Up @@ -225,7 +235,7 @@ func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Reque

// CSIPluginSpecificRequest list the job with CSIInfo
func (s *HTTPServer) CSIPluginSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
}

Expand All @@ -252,5 +262,294 @@ func (s *HTTPServer) CSIPluginSpecificRequest(resp http.ResponseWriter, req *htt
return nil, CodedError(404, "plugin not found")
}

return out.Plugin, nil
return structsCSIPluginToApi(out.Plugin), nil
}

// structsCSIPluginToApi converts CSIPlugin, setting Expected the count of known plugin
// instances
func structsCSIPluginToApi(plug *structs.CSIPlugin) *api.CSIPlugin {
out := &api.CSIPlugin{
ID: plug.ID,
Provider: plug.Provider,
Version: plug.Version,
Allocations: make([]*api.AllocationListStub, len(plug.Allocations)),
ControllerRequired: plug.ControllerRequired,
ControllersHealthy: plug.ControllersHealthy,
ControllersExpected: len(plug.Controllers),
Controllers: make(map[string]*api.CSIInfo),
NodesHealthy: plug.NodesHealthy,
NodesExpected: len(plug.Nodes),
Nodes: make(map[string]*api.CSIInfo),
CreateIndex: plug.CreateIndex,
ModifyIndex: plug.ModifyIndex,
}

for k, v := range plug.Controllers {
out.Controllers[k] = structsCSIInfoToApi(v)
}

for k, v := range plug.Nodes {
out.Nodes[k] = structsCSIInfoToApi(v)
}

for _, a := range plug.Allocations {
out.Allocations = append(out.Allocations, structsAllocListStubToApi(a))
}

return out
}

// structsCSIVolumeToApi converts CSIVolume, creating the allocation array
func structsCSIVolumeToApi(vol *structs.CSIVolume) *api.CSIVolume {
out := &api.CSIVolume{
ID: vol.ID,
Name: vol.Name,
ExternalID: vol.ExternalID,
Namespace: vol.Namespace,
Topologies: structsCSITopolgiesToApi(vol.Topologies),
AccessMode: structsCSIAccessModeToApi(vol.AccessMode),
AttachmentMode: structsCSIAttachmentModeToApi(vol.AttachmentMode),
MountOptions: structsCSIMountOptionsToApi(vol.MountOptions),
Secrets: structsCSISecretsToApi(vol.Secrets),
Parameters: vol.Parameters,
Context: vol.Context,

// Allocations is the collapsed list of both read and write allocs
Allocations: []*api.AllocationListStub{},

Schedulable: vol.Schedulable,
PluginID: vol.PluginID,
Provider: vol.Provider,
ProviderVersion: vol.ProviderVersion,
ControllerRequired: vol.ControllerRequired,
ControllersHealthy: vol.ControllersHealthy,
ControllersExpected: vol.ControllersExpected,
NodesHealthy: vol.NodesHealthy,
NodesExpected: vol.NodesExpected,
ResourceExhausted: vol.ResourceExhausted,
CreateIndex: vol.CreateIndex,
ModifyIndex: vol.ModifyIndex,
}

for _, a := range vol.WriteAllocs {
out.Allocations = append(out.Allocations, structsAllocListStubToApi(a.Stub()))
}

for _, a := range vol.ReadAllocs {
out.Allocations = append(out.Allocations, structsAllocListStubToApi(a.Stub()))
}

return out
}

// structsCSIInfoToApi converts CSIInfo, part of CSIPlugin
func structsCSIInfoToApi(info *structs.CSIInfo) *api.CSIInfo {
out := &api.CSIInfo{
PluginID: info.PluginID,
Healthy: info.Healthy,
HealthDescription: info.HealthDescription,
UpdateTime: info.UpdateTime,
RequiresControllerPlugin: info.RequiresControllerPlugin,
RequiresTopologies: info.RequiresTopologies,
}

if info.ControllerInfo != nil {
out.ControllerInfo = &api.CSIControllerInfo{
SupportsReadOnlyAttach: info.ControllerInfo.SupportsReadOnlyAttach,
SupportsAttachDetach: info.ControllerInfo.SupportsAttachDetach,
SupportsListVolumes: info.ControllerInfo.SupportsListVolumes,
SupportsListVolumesAttachedNodes: info.ControllerInfo.SupportsListVolumesAttachedNodes,
}
}

if info.NodeInfo != nil {
out.NodeInfo = &api.CSINodeInfo{
ID: info.NodeInfo.ID,
MaxVolumes: info.NodeInfo.MaxVolumes,
RequiresNodeStageVolume: info.NodeInfo.RequiresNodeStageVolume,
}

if info.NodeInfo.AccessibleTopology != nil {
out.NodeInfo.AccessibleTopology = &api.CSITopology{}
out.NodeInfo.AccessibleTopology.Segments = info.NodeInfo.AccessibleTopology.Segments
}
}

return out
}

// structsAllocListStubToApi converts AllocListStub, for CSIPlugin
func structsAllocListStubToApi(alloc *structs.AllocListStub) *api.AllocationListStub {
out := &api.AllocationListStub{
ID: alloc.ID,
EvalID: alloc.EvalID,
Name: alloc.Name,
Namespace: alloc.Namespace,
NodeID: alloc.NodeID,
NodeName: alloc.NodeName,
JobID: alloc.JobID,
JobType: alloc.JobType,
JobVersion: alloc.JobVersion,
TaskGroup: alloc.TaskGroup,
DesiredStatus: alloc.DesiredStatus,
DesiredDescription: alloc.DesiredDescription,
ClientStatus: alloc.ClientStatus,
ClientDescription: alloc.ClientDescription,
FollowupEvalID: alloc.FollowupEvalID,
PreemptedAllocations: alloc.PreemptedAllocations,
PreemptedByAllocation: alloc.PreemptedByAllocation,
CreateIndex: alloc.CreateIndex,
ModifyIndex: alloc.ModifyIndex,
CreateTime: alloc.CreateTime,
ModifyTime: alloc.ModifyTime,
}

out.DeploymentStatus = structsAllocDeploymentStatusToApi(alloc.DeploymentStatus)
out.RescheduleTracker = structsRescheduleTrackerToApi(alloc.RescheduleTracker)

for k, v := range alloc.TaskStates {
out.TaskStates[k] = structsTaskStateToApi(v)
}

return out
}

// structsAllocDeploymentStatusToApi converts RescheduleTracker, part of AllocListStub
func structsAllocDeploymentStatusToApi(ads *structs.AllocDeploymentStatus) *api.AllocDeploymentStatus {
out := &api.AllocDeploymentStatus{
Healthy: ads.Healthy,
Timestamp: ads.Timestamp,
Canary: ads.Canary,
ModifyIndex: ads.ModifyIndex,
}
return out
}

// structsRescheduleTrackerToApi converts RescheduleTracker, part of AllocListStub
func structsRescheduleTrackerToApi(rt *structs.RescheduleTracker) *api.RescheduleTracker {
out := &api.RescheduleTracker{}

for _, e := range rt.Events {
out.Events = append(out.Events, &api.RescheduleEvent{
RescheduleTime: e.RescheduleTime,
PrevAllocID: e.PrevAllocID,
PrevNodeID: e.PrevNodeID,
})
}

return out
}

// structsTaskStateToApi converts TaskState, part of AllocListStub
func structsTaskStateToApi(ts *structs.TaskState) *api.TaskState {
out := &api.TaskState{
State: ts.State,
Failed: ts.Failed,
Restarts: ts.Restarts,
LastRestart: ts.LastRestart,
StartedAt: ts.StartedAt,
FinishedAt: ts.FinishedAt,
}

for _, te := range ts.Events {
out.Events = append(out.Events, structsTaskEventToApi(te))
}

return out
}

// structsTaskEventToApi converts TaskEvents, part of AllocListStub
func structsTaskEventToApi(te *structs.TaskEvent) *api.TaskEvent {
out := &api.TaskEvent{
Type: te.Type,
Time: te.Time,
DisplayMessage: te.DisplayMessage,
Details: te.Details,

// DEPRECATION NOTICE: The following fields are all deprecated. see TaskEvent struct in structs.go for details.
FailsTask: te.FailsTask,
RestartReason: te.RestartReason,
SetupError: te.SetupError,
DriverError: te.DriverError,
DriverMessage: te.DriverMessage,
ExitCode: te.ExitCode,
Signal: te.Signal,
Message: te.Message,
KillReason: te.KillReason,
KillTimeout: te.KillTimeout,
KillError: te.KillError,
StartDelay: te.StartDelay,
DownloadError: te.DownloadError,
ValidationError: te.ValidationError,
DiskLimit: te.DiskLimit,
FailedSibling: te.FailedSibling,
VaultError: te.VaultError,
TaskSignalReason: te.TaskSignalReason,
TaskSignal: te.TaskSignal,
GenericSource: te.GenericSource,
}

return out
}

// structsCSITopolgiesToApi converts topologies, part of structsCSIVolumeToApi
func structsCSITopolgiesToApi(tops []*structs.CSITopology) []*api.CSITopology {
out := make([]*api.CSITopology, 0, len(tops))
for _, t := range tops {
out = append(out, &api.CSITopology{
Segments: t.Segments,
})
}

return out
}

// structsCSIAccessModeToApi converts access mode, part of structsCSIVolumeToApi
func structsCSIAccessModeToApi(mode structs.CSIVolumeAccessMode) api.CSIVolumeAccessMode {
switch mode {
case structs.CSIVolumeAccessModeSingleNodeReader:
return api.CSIVolumeAccessModeSingleNodeReader
case structs.CSIVolumeAccessModeSingleNodeWriter:
return api.CSIVolumeAccessModeSingleNodeWriter
case structs.CSIVolumeAccessModeMultiNodeReader:
return api.CSIVolumeAccessModeMultiNodeReader
case structs.CSIVolumeAccessModeMultiNodeSingleWriter:
return api.CSIVolumeAccessModeMultiNodeSingleWriter
case structs.CSIVolumeAccessModeMultiNodeMultiWriter:
return api.CSIVolumeAccessModeMultiNodeMultiWriter
default:
}
return api.CSIVolumeAccessModeUnknown
}

// structsCSIAttachmentModeToApiModeToApi converts attachment mode, part of structsCSIVolumeToApi
func structsCSIAttachmentModeToApi(mode structs.CSIVolumeAttachmentMode) api.CSIVolumeAttachmentMode {
switch mode {
case structs.CSIVolumeAttachmentModeBlockDevice:
return api.CSIVolumeAttachmentModeBlockDevice
case structs.CSIVolumeAttachmentModeFilesystem:
return api.CSIVolumeAttachmentModeFilesystem
default:
}
return api.CSIVolumeAttachmentModeUnknown
}

// structsCSIMountOptionsToApi converts mount options, part of structsCSIVolumeToApi
func structsCSIMountOptionsToApi(opts *structs.CSIMountOptions) *api.CSIMountOptions {
if opts == nil {
return nil
}

return &api.CSIMountOptions{
FSType: opts.FSType,
MountFlags: opts.MountFlags,
}
}

func structsCSISecretsToApi(secrets structs.CSISecrets) api.CSISecrets {
out := make(api.CSISecrets, len(secrets))
for k, v := range secrets {
out[k] = v
}
return out
}
Loading

0 comments on commit 8a095fc

Please sign in to comment.