Skip to content

Commit

Permalink
Merge pull request #939 from huww98/local-deleted-node
Browse files Browse the repository at this point in the history
local: allow volume to be removed from deleted node
  • Loading branch information
k8s-ci-robot authored Jan 8, 2024
2 parents 9281cd3 + 77c8842 commit 413e687
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 63 deletions.
27 changes: 7 additions & 20 deletions pkg/local/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"strings"
"time"

"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/local/lib"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/local/manager"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"net"
"strings"
"time"
)

// Connection lvm connection interface
Expand Down Expand Up @@ -103,30 +103,17 @@ func connect(address string, timeout time.Duration, creds credentials.TransportC
grpc.WithTransportCredentials(creds),
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithUnaryInterceptor(logGRPC),
grpc.WithBlock(),
}
if strings.HasPrefix(address, "/") {
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
}
conn, err := grpc.Dial(address, dialOptions...)

if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
log.Warnf("Connection to %s timed out", address)
return conn, nil // return nil, subsequent GetPluginInfo will show the real connection error
}
if conn.GetState() == connectivity.Ready {
log.Warnf("Connected to %s", address)
return conn, nil
}
log.Infof("Still trying to connect %s, connection is %s", address, conn.GetState())
}

return grpc.DialContext(ctx, address, dialOptions...)
}

func (c *workerConnection) CreateLoopDevice(ctx context.Context, pvName, quotaSize string) (string, error) {
Expand Down
87 changes: 63 additions & 24 deletions pkg/local/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
)

type controllerServer struct {
*csicommon.DefaultControllerServer
client kubernetes.Interface
recorder record.EventRecorder
caCertFile string
clientCertFile string
clientKeyFile string
Expand Down Expand Up @@ -100,6 +103,7 @@ func newControllerServer(d *csicommon.CSIDriver, caCertFile string, clientCertFi
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
client: types.GlobalConfigVar.KubeClient,
recorder: utils.NewEventRecorder(),
caCertFile: caCertFile,
clientCertFile: clientCertFile,
clientKeyFile: clientKeyFile,
Expand Down Expand Up @@ -444,13 +448,36 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
func (cs *controllerServer) getNodeConn(nodeSelected string, caCertFile string, clientCertFile string, clientKeyFile string) (client.Connection, error) {
addr, err := utils.GetNodeAddr(cs.client, nodeSelected, server.GetLvmdPort())
if err != nil {
log.Errorf("CreateVolume: Get node %s address with error: %s", nodeSelected, err.Error())
log.Errorf("Get node %s address with error: %s", nodeSelected, err.Error())
return nil, err
}
conn, err := client.NewGrpcConnection(addr, connectTimeout, caCertFile, clientCertFile, clientKeyFile)
if errors.Is(err, context.DeadlineExceeded) {
// Node IP may be changed, try to get new IP
utils.ClearNodeIPCache(nodeSelected)
addr2, err := utils.GetNodeAddr(cs.client, nodeSelected, server.GetLvmdPort())
if err != nil {
log.Errorf("Get node %s address without cache failed: %s", nodeSelected, err.Error())
return nil, err
}
if addr2 != addr {
log.Infof("Node %s address changed from %s to %s, re-connecting", nodeSelected, addr, addr2)
return client.NewGrpcConnection(addr2, connectTimeout, caCertFile, clientCertFile, clientKeyFile)
} else {
log.Errorf("Node %s address %s verified from Kubernetes, but failed to connect", nodeSelected, addr)
return nil, err
}
}
return conn, err
}

// Send an event then return success
func (cs *controllerServer) handleNodeDeleted(pvObj *v1.PersistentVolume, volumeDesc, nodeName string) (*csi.DeleteVolumeResponse, error) {
log.Infof("DeleteVolume: Node %s deleted, skipping delete volume %s (%s)", nodeName, pvObj.Name, volumeDesc)
cs.recorder.Eventf(pvObj, v1.EventTypeWarning, "VolumeOnDeletedNode", "Volume data may still present in %s at deleted node %s", volumeDesc, nodeName)
return &csi.DeleteVolumeResponse{}, nil
}

// DeleteVolume csi interface
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
log.Infof("DeleteVolume: deleting local volume: %s", req.GetVolumeId())
Expand All @@ -474,6 +501,9 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
if types.GlobalConfigVar.GrpcProvision && nodeName != "" {
conn, err := cs.getNodeConn(nodeName, cs.caCertFile, cs.clientCertFile, cs.clientKeyFile)
if err != nil {
if apierrors.IsNotFound(err) {
return cs.handleNodeDeleted(pvObj, fmt.Sprintf("LVM logical volume %s/%s", vgName, volumeID), nodeName)
}
log.Errorf("DeleteVolume: New lvm %s Connection at node %s with error: %s", req.GetVolumeId(), nodeName, err.Error())
return nil, err
}
Expand Down Expand Up @@ -536,20 +566,20 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, errors.New("MountPoint Pv is illegal, No node info")
}
nodeName := nodes[0]
path := pvObj.Spec.CSI.VolumeAttributes[MountPointType]
if path == "" {
log.Errorf("DeleteVolume: Get MountPoint Path for volume %s, with empty", volumeID)
return nil, errors.New("MountPoint Path is empty")
}
conn, err := cs.getNodeConn(nodeName, cs.caCertFile, cs.clientCertFile, cs.clientKeyFile)
if err != nil {
if apierrors.IsNotFound(err) {
return cs.handleNodeDeleted(pvObj, fmt.Sprintf("mount point %s", path), nodeName)
}
log.Errorf("DeleteVolume: New mountpoint %s Connection error: %s", req.GetVolumeId(), err.Error())
return nil, err
}
defer conn.Close()
path := ""
if value, ok := pvObj.Spec.CSI.VolumeAttributes[MountPointType]; ok {
path = value
}
if path == "" {
log.Errorf("DeleteVolume: Get MountPoint Path for volume %s, with empty", volumeID)
return nil, errors.New("MountPoint Path is empty")
}
if err := conn.CleanPath(ctx, path); err != nil {
log.Errorf("DeleteVolume: Remove mountpoint for %s with error: %s", req.GetVolumeId(), err.Error())
return nil, errors.New("DeleteVolume: Delete mountpoint Failed: " + err.Error())
Expand All @@ -560,17 +590,20 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
log.Infof("DeleteVolume: successful delete Device volume(%s)...", volumeID)
case PmemVolumeType:
if nodeName != "" {
namespace, ok := pvObj.Spec.CSI.VolumeAttributes["pmemNameSpace"]
if !ok {
log.Errorf("DeleteVolume: Direct PMEM volume can not found NameSpace: %s", volumeID)
return nil, errors.New("DeleteVolume Direct PMEM volume can not found NameSpace " + volumeID)
}
conn, err := cs.getNodeConn(nodeName, cs.caCertFile, cs.clientCertFile, cs.clientKeyFile)
if err != nil {
if apierrors.IsNotFound(err) {
return cs.handleNodeDeleted(pvObj, fmt.Sprintf("PMEM namespace %s", namespace), nodeName)
}
log.Errorf("DeleteVolume: New PMEM %s Connection at node %s with error: %s", req.GetVolumeId(), nodeName, err.Error())
return nil, err
}
defer conn.Close()
if _, ok := pvObj.Spec.CSI.VolumeAttributes["pmemNameSpace"]; !ok {
log.Errorf("DeleteVolume: Direct PMEM volume can not found NameSpace: %s", volumeID)
return nil, errors.New("DeleteVolume Direct PMEM volume can not found NameSpace " + volumeID)
}
namespace := pvObj.Spec.CSI.VolumeAttributes["pmemNameSpace"]
if pmemName, err := conn.GetNameSpace(ctx, "", volumeID); err == nil && pmemName != "" {
if err := conn.DeleteNameSpace(ctx, namespace); err != nil {
log.Errorf("DeleteVolume: Remove PMEM direct volume %s at node %s with error: %s", volumeID, nodeName, err.Error())
Expand All @@ -588,18 +621,21 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
case QuotaPathVolumeType:
if nodeName != "" {
quotaPath, ok := pvObj.Spec.CSI.VolumeAttributes[ProjQuotaFullPath]
if !ok {
log.Errorf("DeleteVolume: QuotaPath volume %s not have projQuotaFullPath parameter", req.VolumeId)
return nil, fmt.Errorf("DeleteVolume: QuotaPath volume %s not have projQuotaFullPath parameter", req.VolumeId)
}
conn, err := cs.getNodeConn(nodeName, cs.caCertFile, cs.clientCertFile, cs.clientKeyFile)
if err != nil {
if apierrors.IsNotFound(err) {
return cs.handleNodeDeleted(pvObj, fmt.Sprintf("path %s", quotaPath), nodeName)
}
log.Errorf("DeleteVolume: get QuotaPath volume %s Connection at node %s with error: %s", req.VolumeId, nodeName, err.Error())
return nil, err
}
defer conn.Close()

if _, ok := pvObj.Spec.CSI.VolumeAttributes[ProjQuotaFullPath]; !ok {
log.Errorf("DeleteVolume: QuotaPath volume %s not have projQuotaFullPath parameter", req.VolumeId)
return nil, fmt.Errorf("DeleteVolume: QuotaPath volume %s not have projQuotaFullPath parameter", req.VolumeId)
}
quotaPath := pvObj.Spec.CSI.VolumeAttributes[ProjQuotaFullPath]
_, err = conn.RemoveProjQuotaSubpath(ctx, quotaPath)
if err != nil {
log.Errorf("DeleteVolume: Remove QuotaPath volume %s at node %s with error %s", req.VolumeId, nodeName, err.Error())
Expand All @@ -611,18 +647,21 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
case LoopDeviceVolumeType:
if nodeName != "" {
loopdevicePath, ok := pvObj.Spec.CSI.VolumeAttributes[LoopDeviceFullPath]
if !ok {
log.Errorf("DeleteVolume: LoopDevice volume %s not have loopDeviceFullPath parameter", req.VolumeId)
return nil, fmt.Errorf("DeleteVolume: loopdevice volume %s not have loopDeviceFullPath parameter", req.VolumeId)
}
conn, err := cs.getNodeConn(nodeName, cs.caCertFile, cs.clientCertFile, cs.clientKeyFile)
if err != nil {
if apierrors.IsNotFound(err) {
return cs.handleNodeDeleted(pvObj, fmt.Sprintf("loop device path %s", loopdevicePath), nodeName)
}
log.Errorf("DeleteVolume: get loopdevice volume %s Connection at node %s with error: %s", req.VolumeId, nodeName, err.Error())
return nil, err
}
defer conn.Close()

if _, ok := pvObj.Spec.CSI.VolumeAttributes[LoopDeviceFullPath]; !ok {
log.Errorf("DeleteVolume: LoopDevice volume %s not have loopDeviceFullPath parameter", req.VolumeId)
return nil, fmt.Errorf("DeleteVolume: loopdevice volume %s not have loopDeviceFullPath parameter", req.VolumeId)
}
loopdevicePath := pvObj.Spec.CSI.VolumeAttributes[LoopDeviceFullPath]
log.Errorf("DeleteVolume: LoopDevice volume associated with devices: %s", loopdevicePath)
_, err = conn.DeleteLoopDevice(ctx, pvObj.Name)
if err != nil {
Expand Down
31 changes: 12 additions & 19 deletions pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,7 @@ const (
var KubernetesAlicloudIdentity = "Kubernetes.Alicloud/CsiPlugin"

var (
// NodeAddrMap map for NodeID and its Address
NodeAddrMap = map[string]string{}
// NodeAddrMutex Mutex for NodeAddr map
NodeAddrMutex sync.RWMutex
nodeAddrMap = sync.Map{}
)

// RoleAuth define STS Token Response
Expand Down Expand Up @@ -739,11 +736,8 @@ func Fsync(f *os.File) error {
return f.Sync()
}

// SetNodeAddrMap set map with mutex
func SetNodeAddrMap(key string, value string) {
NodeAddrMutex.Lock()
NodeAddrMap[key] = value
NodeAddrMutex.Unlock()
func ClearNodeIPCache(nodeID string) {
nodeAddrMap.Delete(nodeID)
}

// GetNodeAddr get node address
Expand All @@ -752,13 +746,13 @@ func GetNodeAddr(client kubernetes.Interface, node string, port string) (string,
if err != nil {
return "", err
}
return ip.String() + ":" + port, nil
return net.JoinHostPort(ip.String(), port), nil
}

// GetNodeIP get node address
func GetNodeIP(client kubernetes.Interface, nodeID string) (net.IP, error) {
if value, ok := NodeAddrMap[nodeID]; ok && value != "" {
return net.ParseIP(value), nil
if value, ok := nodeAddrMap.Load(nodeID); ok {
return value.(net.IP), nil
}
node, err := client.CoreV1().Nodes().Get(context.Background(), nodeID, metav1.GetOptions{})
if err != nil {
Expand All @@ -769,13 +763,12 @@ func GetNodeIP(client kubernetes.Interface, nodeID string) (net.IP, error) {
for i := range addresses {
addressMap[addresses[i].Type] = append(addressMap[addresses[i].Type], addresses[i])
}
if addresses, ok := addressMap[v1.NodeInternalIP]; ok {
SetNodeAddrMap(nodeID, addresses[0].Address)
return net.ParseIP(addresses[0].Address), nil
}
if addresses, ok := addressMap[v1.NodeExternalIP]; ok {
SetNodeAddrMap(nodeID, addresses[0].Address)
return net.ParseIP(addresses[0].Address), nil
for _, t := range []v1.NodeAddressType{v1.NodeInternalIP, v1.NodeExternalIP} {
if addresses, ok := addressMap[t]; ok {
ip := net.ParseIP(addresses[0].Address)
nodeAddrMap.Store(nodeID, ip)
return ip, nil
}
}
return nil, fmt.Errorf("Node IP unknown; known addresses: %v", addresses)
}
Expand Down

0 comments on commit 413e687

Please sign in to comment.