diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index ef9323e96..5ea2be989 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -903,7 +903,12 @@ func (cs *controller) GetCapacity( var availableCapacity int64 for _, nodeName := range nodeNames { - v, exists, err := zfsNodesCache.GetByKey(zfs.OpenEBSNamespace + "/" + nodeName) + mappedNodeId, mapErr := zfs.GetNodeID(nodeName) + if mapErr != nil { + klog.Warningf("Unable to find mapped node id for %s", nodeName) + mappedNodeId = nodeName + } + v, exists, err := zfsNodesCache.GetByKey(zfs.OpenEBSNamespace + "/" + mappedNodeId) if err != nil { klog.Warning("unexpected error after querying the zfsNode informer cache") continue diff --git a/pkg/mgmt/zfsnode/start.go b/pkg/mgmt/zfsnode/start.go index 395ae4f92..cf7b709fb 100644 --- a/pkg/mgmt/zfsnode/start.go +++ b/pkg/mgmt/zfsnode/start.go @@ -18,6 +18,11 @@ package zfsnode import ( "context" + "fmt" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/klog/v2" "sync" "time" @@ -27,7 +32,6 @@ import ( "github.com/openebs/zfs-localpv/pkg/zfs" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" ) @@ -60,10 +64,26 @@ func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { options.FieldSelector = fields.OneTermEqualSelector("metadata.name", zfs.NodeID).String() })) - k8sNode, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), zfs.NodeID, metav1.GetOptions{}) + topologyRequirement, requirementError := labels.NewRequirement(zfs.ZFSTopologyKey, selection.Equals, []string{zfs.NodeID}) + if requirementError != nil { + return errors.Wrapf(requirementError, "Unable to retrieve node by %s for node id %s", zfs.ZFSTopologyKey, zfs.NodeID) + } + topologySelector := labels.NewSelector().Add(*topologyRequirement).String() + klog.Infof("The topology selector is %s", topologySelector) + + k8sNodeCandidates, err := kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + LabelSelector: topologySelector, + }) + if err != nil { return errors.Wrapf(err, "fetch k8s node %s", zfs.NodeID) } + + if k8sNodeCandidates == nil || len(k8sNodeCandidates.Items) != 1 { + return fmt.Errorf("unable to retrieve a single node by %s for %s", zfs.ZFSTopologyKey, zfs.NodeID) + } + + k8sNode := k8sNodeCandidates.Items[0] isTrue := true // as object returned by client go clears all TypeMeta from it. nodeGVK := &schema.GroupVersionKind{