Skip to content

Commit

Permalink
feat(spc): add feature to limit overprovisioning of cstor volumes
Browse files Browse the repository at this point in the history
Signed-off-by: Ashutosh Kumar <ashutosh.kumar@openebs.io>
  • Loading branch information
Ashutosh Kumar committed Jan 7, 2020
1 parent 77442f2 commit 97a8412
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 70 deletions.
182 changes: 175 additions & 7 deletions pkg/algorithm/cstorpoolselect/v1alpha1/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package v1alpha1

import (
"errors"
"fmt"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/resource"
"strings"
"text/template"

apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
csp "github.com/openebs/maya/pkg/cstor/pool/v1alpha2"
cstorvolumereplica "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1"
cstorvolume "github.com/openebs/maya/pkg/cstor/volume/v1alpha1"
cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1"
spc "github.com/openebs/maya/pkg/storagepoolclaim/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -51,6 +56,12 @@ const (
scheduleOnHostAnnotation annotationKey = "volume.kubernetes.io/selected-node"
)

type overProvisioningKey string

const (
overProvisioningOnSPC overProvisioningKey = "overProvisioning"
)

type priority int

const (
Expand Down Expand Up @@ -92,6 +103,11 @@ const (
// the policy that does a best effort to select
// the given host to place storage
preferScheduleOnHostAnnotationPolicy policyName = "prefer-schedule-on-host"

// overProvisioningPolicy is the name of
// the policy that selects the given pool to
// place storage according to overProvisioning policy
overProvisioningPolicy policyName = "overProvisioning"
)

// policy exposes contracts that need
Expand All @@ -103,6 +119,84 @@ type policy interface {
filter(*csp.CSPList) (*csp.CSPList, error)
}

// scheduleWithOverProvisioningAwareness is a pool
// selection implementation.
type scheduleWithOverProvisioningAwareness struct {
overProvisioning bool
spcName string
capacity resource.Quantity
err []error
}

// priority returns the priority of the
// policy implementation
func (p scheduleWithOverProvisioningAwareness) priority() priority {
return highPriority
}

// name returns the name of the policy
// implementation
func (p scheduleWithOverProvisioningAwareness) name() policyName {
return overProvisioningPolicy
}

// filter selects the pools available on the host
// for which the policy has been applied
func (p scheduleWithOverProvisioningAwareness) filter(pools *csp.CSPList) (*csp.CSPList, error) {
if len(p.err) > 0 {
return nil, errors.Errorf("failed to fetch overprovisioning details:%v", p.err)
}
var filteredPools *csp.CSPList
for _, pool := range pools.Items {
volCap, err := getAllVolumeCapacity(pool.Object)
if err != nil {
return nil, errors.Wrapf(err,"failed to get capacity consumed by existing volumes on pool %s ",pool.Object.Name)
}
if pool.HasSpace(p.capacity, volCap) {
filteredPools.Items = append(filteredPools.Items, pool)
}
}
return filteredPools, nil
}

// getAllVolumeCapacity returns the sum of total capacities of all the volumes
// present of the given CSP.
func getAllVolumeCapacity(csp *apis.CStorPool) (resource.Quantity, error) {
var totalcapcity resource.Quantity
cstorVolumeMap := make(map[string]bool)
label := string(apis.CStorPoolKey) + "=" + csp.Name
cstorVolumeReplicaObjList, err := cstorvolumereplica.NewKubeclient().WithNamespace("openebs").List(metav1.ListOptions{LabelSelector: label})
if err != nil {
return resource.Quantity{}, errors.Wrapf(err,"error in listing all cvr resources present on %s csp",csp.Name)
}
for _, cvr := range cstorVolumeReplicaObjList.Items {
if cvr.Labels==nil{
return resource.Quantity{}, errors.Errorf("No labels found on cvr %s",cvr.Name)
}
cstorVolumeMap[cvr.Labels[string(apis.CStorVolumeKey)]] = true
}

for cv,_:=range cstorVolumeMap{
cap,err:=getCStorVolumeCapacity(cv)
if err!=nil{
return resource.Quantity{}, errors.Wrapf(err,"failed to get capacity for cstorvolume %s",cv)
}
cap.Add(totalcapcity)
}

return totalcapcity, nil

}

// getCStorVolumeCapacity returns the capacity present on a CStorVolume CR.
func getCStorVolumeCapacity(name string)(resource.Quantity,error) {
cv,err:=cstorvolume.NewKubeclient().WithNamespace("openebs").Get(name,metav1.GetOptions{})
if err!=nil{
return resource.Quantity{}, errors.Wrapf(err,"failed to fetch cstorvolume %s",name)
}
return cv.Spec.Capacity, nil
}

// scheduleOnHost is a pool selection
// implementation
type scheduleOnHost struct {
Expand Down Expand Up @@ -374,7 +468,7 @@ type buildOption func(*selection)

func withDefaultSelection(s *selection) {
if string(s.mode) == "" {
s.mode = singleExection
s.mode = multiExecution
}
}

Expand Down Expand Up @@ -447,6 +541,68 @@ func PreferScheduleOnHostAnnotation(hostNameAnnotation string) buildOption {
}
}

// OverProvisioning adds scheduleWithOverProvisioningAwareness as a policy
// to be used during pool selection.
func OverProvisioning(values ...string) buildOption {
return func(s *selection) {
var err error
overProvisioningPolicy := &scheduleWithOverProvisioningAwareness{}

spcName := getSPCName(values...)
if strings.TrimSpace(spcName) == "" {
err = errors.New("Got empty storage pool claim from runtask")
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)

}
overProvisioningPolicy.spcName = spcName

volCapacity, err := getVolumeCapacity(values...)
if err != nil {
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)
}
overProvisioningPolicy.capacity = volCapacity

if len(overProvisioningPolicy.err) == 0 {
spc, err := getSPC(spcName)
if err != nil {
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)
} else {
if spc.Spec.PoolSpec.OverProvisioning {
overProvisioningPolicy.overProvisioning = true
}
}
}
s.policies.add(overProvisioningPolicy)
}
}

func getSPCName(values ...string) string {
for _, val := range values {
if strings.Contains(val, string("openebs.io/storage-pool-claim")) {
str := strings.Split(val, "=")
return str[1]
}
}
return ""
}

func getSPC(name string) (*apis.StoragePoolClaim, error) {
return spc.NewKubeClient().Get(name, metav1.GetOptions{})
}

func getVolumeCapacity(values ...string) (resource.Quantity, error) {
var capacity string
for _, val := range values {
if strings.Contains(val, string("volume.kubernetes.io/capacity")) {
str := strings.Split(val, "=")
capacity = str[1]
}
}

return resource.ParseQuantity(capacity)

}

// GetPolicies returns the appropriate selection
// policies based on the provided values
func GetPolicies(values ...string) []buildOption {
Expand All @@ -460,6 +616,7 @@ func GetPolicies(values ...string) []buildOption {
opts = append(opts, AntiAffinityLabel(val))
}
}
opts = append(opts, OverProvisioning(values...))
return opts
}

Expand Down Expand Up @@ -527,10 +684,21 @@ func FilterPoolIDs(entries *csp.CSPList, opts []buildOption) ([]string, error) {
// go template functions
func TemplateFunctions() template.FuncMap {
return template.FuncMap{
"cspGetPolicies": GetPolicies,
"cspFilterPoolIDs": FilterPoolIDs,
"cspAntiAffinity": AntiAffinityLabel,
"cspPreferAntiAffinity": PreferAntiAffinityLabel,
"preferScheduleOnHost": PreferScheduleOnHostAnnotation,
"cspGetPolicies": GetPolicies,
"cspFilterPoolIDs": FilterPoolIDs,
"cspAntiAffinity": AntiAffinityLabel,
"cspPreferAntiAffinity": PreferAntiAffinityLabel,
"preferScheduleOnHost": PreferScheduleOnHostAnnotation,
"capacityAwareScheduling": OverProvisioning,
}
}

func GetSize(r resource.Quantity) {
fmt.Println("Printing Quantity:", r)
q, ok := r.AsInt64()
if !ok {
fmt.Println("Conversion failed")
}

fmt.Println("Int val", q)
}
10 changes: 5 additions & 5 deletions pkg/algorithm/cstorpoolselect/v1alpha1/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func TestTemplateFunctionsCount(t *testing.T) {
tests := map[string]struct {
expectedLength int
}{
"Test 1": {5},
"Test 1": {6},
}

for name, test := range tests {
Expand Down Expand Up @@ -636,10 +636,10 @@ func TestGetPolicies(t *testing.T) {
selectors []string
expectedBuildoptions []buildOption
}{
"Test 1": {selectors: []string{}, expectedBuildoptions: []buildOption{}},
"Test 2": {selectors: []string{fakeAntiAffinitySelector}, expectedBuildoptions: []buildOption{AntiAffinityLabel(fakeAntiAffinitySelector)}},
"Test 3": {selectors: []string{fakePreferAntiAffinitySelector}, expectedBuildoptions: []buildOption{PreferAntiAffinityLabel(fakePreferAntiAffinitySelector)}},
"Test 4": {selectors: []string{fakePreferScheduleOnHostSelector}, expectedBuildoptions: []buildOption{PreferScheduleOnHostAnnotation(fakePreferScheduleOnHostSelector)}},
"Test 1": {selectors: []string{}, expectedBuildoptions: []buildOption{OverProvisioning()}},
"Test 2": {selectors: []string{fakeAntiAffinitySelector}, expectedBuildoptions: []buildOption{AntiAffinityLabel(fakeAntiAffinitySelector), OverProvisioning()}},
"Test 3": {selectors: []string{fakePreferAntiAffinitySelector}, expectedBuildoptions: []buildOption{PreferAntiAffinityLabel(fakePreferAntiAffinitySelector), OverProvisioning()}},
"Test 4": {selectors: []string{fakePreferScheduleOnHostSelector}, expectedBuildoptions: []buildOption{PreferScheduleOnHostAnnotation(fakePreferScheduleOnHostSelector), OverProvisioning()}},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/openebs.io/v1alpha1/cas_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ const (
// PredecessorBDKey is the key to fetch the predecessor BD in case of
// block device replacement.
PredecessorBDKey = "openebs.io/bd-predecessor"

// CStorPoolKey is the key to fetch the CVRs on a specific
// CStor pool. This key is present on CVR labels
CStorPoolKey CASKey = "cstorpool.openebs.io/name"

// cstorVolumeKey is the key to getch CStorVolume CR of a
// CVR. This key is present on CVR label.
CStorVolumeKey CASKey = "cstorVolumeMap"
)

// CASPlainKey represents a openebs key used either in resource annotation
Expand Down
Loading

0 comments on commit 97a8412

Please sign in to comment.