Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spc): add feature to limit overprovisioning of cstor volumes #1577

Merged
merged 6 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 184 additions & 7 deletions pkg/algorithm/cstorpoolselect/v1alpha1/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@
package v1alpha1

import (
"errors"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog"
"strings"
"text/template"

apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
csp "github.com/openebs/maya/pkg/cstor/pool/v1alpha2"
cstorvolume "github.com/openebs/maya/pkg/cstor/volume/v1alpha1"
cstorvolumereplica "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1"
cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1"
env "github.com/openebs/maya/pkg/env/v1alpha1"
spc "github.com/openebs/maya/pkg/storagepoolclaim/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -40,6 +46,7 @@ const (
// replicaAntiAffinty is the label key
// that refers to replica anti affinity policy
replicaAntiAffinityLabel labelKey = "openebs.io/replica-anti-affinity"
volumeCapacityLabel labelKey = "volume.kubernetes.io/capacity"
)

type annotationKey string
Expand Down Expand Up @@ -92,6 +99,11 @@ const (
// the policy that does a best effort to select
// the given host to place storage
preferScheduleOnHostAnnotationPolicy policyName = "prefer-schedule-on-host"

// overProvisioningPolicy is the name of
// the policy that selects the given pool to
// place storage according to overProvisioning policy
overProvisioningPolicy policyName = "overProvisioning"
)

// policy exposes contracts that need
Expand All @@ -103,6 +115,101 @@ type policy interface {
filter(*csp.CSPList) (*csp.CSPList, error)
}

// scheduleWithOverProvisioningAwareness is a pool
// selection implementation.
type scheduleWithOverProvisioningAwareness struct {
// overProvisioning field if true means over-provisioning is enabled or vice-versa.
overProvisioning bool
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
// spcName is the name of the SPC to which the over-provisioning policy will be
// applied and volume will be created from CSPs of this SPC.
spcName string
// openebsNamespace is the namespace where OpenEBS is installed.
openebsNamespace string
// totalCapacity is the capacity of the incoming volume.
totalCapacity resource.Quantity
// err constains a list of error in if any while building this current structure.
err []error
}

// priority returns the priority of the
// policy implementation
func (p scheduleWithOverProvisioningAwareness) priority() priority {
return highPriority
}

// name returns the name of the policy
// implementation
func (p scheduleWithOverProvisioningAwareness) name() policyName {
return overProvisioningPolicy
}

// filter selects the pools available on the host
// for which the policy has been applied
func (p scheduleWithOverProvisioningAwareness) filter(pools *csp.CSPList) (*csp.CSPList, error) {
if len(p.err) > 0 {
return nil, errors.Errorf("failed to fetch overprovisioning details:%v", p.err)
}

if p.overProvisioning {
klog.Infof("Overprovisioning restriction policy not added as overprovisioning is enabled on spc %s", p.spcName)
return pools, nil
}

filteredPools := &csp.CSPList{Items: []*csp.CSP{}}
for _, pool := range pools.Items {
volCap, err := p.consumedCapacity(pool.Object)
if err != nil {
klog.Errorf("failed to get capacity consumed by existing volumes on pool %s:{%s} ", pool.Object.UID, err.Error())
continue
}
if pool.HasSpace(p.totalCapacity, volCap) {
filteredPools.Items = append(filteredPools.Items, pool)
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
} else {
klog.V(2).Infof("Can't select CSP with UID %q: Required space not available: Policy %s", pool.Object.UID, overProvisioningPolicy)
}
}
return filteredPools, nil
}

// getAllVolumeCapacity returns the sum of total capacities of all the volumes
// present of the given CSP.
func (p *scheduleWithOverProvisioningAwareness) consumedCapacity(csp *apis.CStorPool) (resource.Quantity, error) {
var totalcapcity resource.Quantity
cstorVolumeMap := make(map[string]bool)
label := string(apis.CStorPoolKey) + "=" + csp.Name
cstorVolumeReplicaObjList, err := cstorvolumereplica.NewKubeclient().WithNamespace(p.openebsNamespace).List(metav1.ListOptions{LabelSelector: label})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does CVRs always stay in openebsNamespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

openebsNamespace is the namespace where openebs is installed. And CVRs always stay in a namespace where openebs is installed.

if err != nil {
return resource.Quantity{}, errors.Wrapf(err, "Failed to get total volume capacity for CSP %s", csp.Name)
}
for _, cvr := range cstorVolumeReplicaObjList.Items {
if cvr.Labels == nil {
return resource.Quantity{}, errors.Errorf("Failed to get total volume capacity for CSP %s: "+
"Missing labels in CVR %s: Want label %s to calculate total volume capacity", csp.UID, cvr.Name, volumeCapacityLabel)
}
cstorVolumeMap[cvr.Labels[string(apis.CStorVolumeKey)]] = true
}

for cv := range cstorVolumeMap {
cap, err := p.getCStorVolumeCapacity(cv)
if err != nil {
return resource.Quantity{}, errors.Wrapf(err, "failed to get capacity for cstorvolume %s", cv)
}
cap.Add(totalcapcity)
}

return totalcapcity, nil

}

// getCStorVolumeCapacity returns the capacity present on a CStorVolume CR.
func (p *scheduleWithOverProvisioningAwareness) getCStorVolumeCapacity(name string) (resource.Quantity, error) {
cv, err := cstorvolume.NewKubeclient().WithNamespace(p.openebsNamespace).Get(name, metav1.GetOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does cv stays in openebsNamespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

if err != nil {
return resource.Quantity{}, errors.Wrapf(err, "failed to fetch cstorvolume %s", name)
}
return cv.Spec.Capacity, nil
}

// scheduleOnHost is a pool selection
// implementation
type scheduleOnHost struct {
Expand Down Expand Up @@ -374,7 +481,7 @@ type buildOption func(*selection)

func withDefaultSelection(s *selection) {
if string(s.mode) == "" {
s.mode = singleExection
s.mode = multiExecution
}
}

Expand Down Expand Up @@ -447,6 +554,74 @@ func PreferScheduleOnHostAnnotation(hostNameAnnotation string) buildOption {
}
}

// CapacityAwareProvisioning adds scheduleWithOverProvisioningAwareness as a policy
// to be used during pool selection.
func CapacityAwareProvisioning(values ...string) buildOption {
return func(s *selection) {
var err error
overProvisioningPolicy := &scheduleWithOverProvisioningAwareness{}

spcName := getSPCName(values...)
if strings.TrimSpace(spcName) == "" {
err = errors.New("Got empty storage pool claim from runtask")
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)

}
overProvisioningPolicy.spcName = spcName

volCapacity, err := getVolumeCapacity(values...)
if err != nil {
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)
}
overProvisioningPolicy.totalCapacity = volCapacity

// Get the namespace where OpenEBS is installed

openEBSnamespace := env.Get(env.OpenEBSNamespace)
overProvisioningPolicy.openebsNamespace = openEBSnamespace

if len(overProvisioningPolicy.err) == 0 {
spc, err := getSPC(spcName)
if err != nil {
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)
} else {
if spc.Spec.PoolSpec.OverProvisioning {
overProvisioningPolicy.overProvisioning = true
}
}
}
s.policies.add(overProvisioningPolicy)
}
}

func getSPCName(values ...string) string {

for _, val := range values {
if strings.Contains(val, string(apis.StoragePoolClaimCPK)) {
str := strings.Split(val, "=")
return str[1]
}
}
return ""
}

func getSPC(name string) (*apis.StoragePoolClaim, error) {
return spc.NewKubeClient().Get(name, metav1.GetOptions{})
}

func getVolumeCapacity(values ...string) (resource.Quantity, error) {
var capacity string
for _, val := range values {
if strings.Contains(val, string(volumeCapacityLabel)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where are we filling this label with total capacity?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.. got it.. this is the capacity of the volume to be provisioned

str := strings.Split(val, "=")
capacity = str[1]
}
}

return resource.ParseQuantity(capacity)

}

// GetPolicies returns the appropriate selection
// policies based on the provided values
func GetPolicies(values ...string) []buildOption {
Expand All @@ -460,6 +635,7 @@ func GetPolicies(values ...string) []buildOption {
opts = append(opts, AntiAffinityLabel(val))
}
}
opts = append(opts, CapacityAwareProvisioning(values...))
return opts
}

Expand Down Expand Up @@ -527,10 +703,11 @@ func FilterPoolIDs(entries *csp.CSPList, opts []buildOption) ([]string, error) {
// go template functions
func TemplateFunctions() template.FuncMap {
return template.FuncMap{
"cspGetPolicies": GetPolicies,
"cspFilterPoolIDs": FilterPoolIDs,
"cspAntiAffinity": AntiAffinityLabel,
"cspPreferAntiAffinity": PreferAntiAffinityLabel,
"preferScheduleOnHost": PreferScheduleOnHostAnnotation,
"cspGetPolicies": GetPolicies,
"cspFilterPoolIDs": FilterPoolIDs,
"cspAntiAffinity": AntiAffinityLabel,
"cspPreferAntiAffinity": PreferAntiAffinityLabel,
"preferScheduleOnHost": PreferScheduleOnHostAnnotation,
"capacityAwareProvisioning": CapacityAwareProvisioning,
}
}
13 changes: 7 additions & 6 deletions pkg/algorithm/cstorpoolselect/v1alpha1/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ func fakeCSPListOk(uids ...string) *csp.CSPList {

func fakeCSPListScheduleOnHostOk(hostuid string, uids ...string) *csp.CSPList {
fakeMap := map[string]string{}
fakeCapacityMap := map[string]string{}
for _, uid := range uids {
if hostuid == uid {
fakeMap[hostuid] = fakeValidHost
} else {
fakeMap[uid] = fakeinvalidHost
}
}
return csp.ListBuilder().WithUIDNode(fakeMap).List()
return csp.ListBuilder().WithUIDNode(fakeMap, fakeCapacityMap).List()
}

func fakeCVRListOk(uids ...string) cvrListFn {
Expand Down Expand Up @@ -528,7 +529,7 @@ func TestTemplateFunctionsCount(t *testing.T) {
tests := map[string]struct {
expectedLength int
}{
"Test 1": {5},
"Test 1": {6},
}

for name, test := range tests {
Expand Down Expand Up @@ -636,10 +637,10 @@ func TestGetPolicies(t *testing.T) {
selectors []string
expectedBuildoptions []buildOption
}{
"Test 1": {selectors: []string{}, expectedBuildoptions: []buildOption{}},
"Test 2": {selectors: []string{fakeAntiAffinitySelector}, expectedBuildoptions: []buildOption{AntiAffinityLabel(fakeAntiAffinitySelector)}},
"Test 3": {selectors: []string{fakePreferAntiAffinitySelector}, expectedBuildoptions: []buildOption{PreferAntiAffinityLabel(fakePreferAntiAffinitySelector)}},
"Test 4": {selectors: []string{fakePreferScheduleOnHostSelector}, expectedBuildoptions: []buildOption{PreferScheduleOnHostAnnotation(fakePreferScheduleOnHostSelector)}},
"Test 1": {selectors: []string{}, expectedBuildoptions: []buildOption{CapacityAwareProvisioning()}},
"Test 2": {selectors: []string{fakeAntiAffinitySelector}, expectedBuildoptions: []buildOption{AntiAffinityLabel(fakeAntiAffinitySelector), CapacityAwareProvisioning()}},
"Test 3": {selectors: []string{fakePreferAntiAffinitySelector}, expectedBuildoptions: []buildOption{PreferAntiAffinityLabel(fakePreferAntiAffinitySelector), CapacityAwareProvisioning()}},
"Test 4": {selectors: []string{fakePreferScheduleOnHostSelector}, expectedBuildoptions: []buildOption{PreferScheduleOnHostAnnotation(fakePreferScheduleOnHostSelector), CapacityAwareProvisioning()}},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/openebs.io/v1alpha1/cas_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ const (
// PredecessorBDKey is the key to fetch the predecessor BD in case of
// block device replacement.
PredecessorBDKey = "openebs.io/bd-predecessor"

// CStorPoolKey is the key to fetch the CVRs on a specific
// CStor pool. This key is present on CVR labels
CStorPoolKey CASKey = "cstorpool.openebs.io/name"

// cstorVolumeKey is the key to getch CStorVolume CR of a
// CVR. This key is present on CVR label.
CStorVolumeKey CASKey = "cstorvolume.openebs.io/name"
)

// CASPlainKey represents a openebs key used either in resource annotation
Expand Down
Loading