Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhook): add cspc scaledown validations #101

Merged
merged 5 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ script:
# dependencies.
- ./ci/sanity/install.sh
# Run the sanity integration test for cstor-operators
# - ./ci/sanity/sanity.sh
- ./ci/sanity/sanity.sh

after_success:
- make deploy-images
Expand Down
71 changes: 71 additions & 0 deletions pkg/webhook/cspc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pkg/errors"
"k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog"
)

Expand Down Expand Up @@ -488,6 +489,16 @@ func (wh *webhook) validateCSPCUpdateRequest(req *v1beta1.AdmissionRequest, getC
return response
}
pOps := NewPoolOperations(wh.kubeClient, wh.clientset).WithNewCSPC(&cspcNew).WithOldCSPC(cspcOld)

if ok, msg := pOps.ValidateScaledown(); !ok {
err = errors.Errorf("invalid cspc specification: %s", msg)
shubham14bajpai marked this conversation as resolved.
Show resolved Hide resolved
// As scale down validation may take more time than the timeout value set
// on webhook having a log will help debug
klog.Error(err)
response = BuildForAPIObject(response).UnSetAllowed().WithResultAsFailure(err, http.StatusUnprocessableEntity).AR
return response
}

commonPoolSpec, err := getCommonPoolSpecs(&cspcNew, cspcOld, wh.kubeClient)

if err != nil {
Expand All @@ -503,3 +514,63 @@ func (wh *webhook) validateCSPCUpdateRequest(req *v1beta1.AdmissionRequest, getC

return response
}

// ValidateScaledown validates whether any cvr exist on the cspi
// that is being scaled down
func (p *PoolOperations) ValidateScaledown() (bool, string) {
removedPools := []string{}
for _, oldPool := range p.OldCSPC.Spec.Pools {
found := false
for _, newPool := range p.NewCSPC.Spec.Pools {
if reflect.DeepEqual(oldPool.NodeSelector, newPool.NodeSelector) {
found = true
break
}
}
if !found && p.IsScaledownCase(oldPool) {
cspi, err := p.clientset.CstorV1().CStorPoolInstances(p.OldCSPC.Namespace).
List(metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(oldPool.NodeSelector).String() + "," +
types.CStorPoolClusterLabelKey + "=" + p.OldCSPC.Name,
})
if err != nil {
return false, fmt.Sprintf("Could not list cspi for cspc %s: %s", p.OldCSPC.Name, err.Error())
}
removedPools = append(removedPools, cspi.Items[0].Name)
}
}
for _, cspiName := range removedPools {
// list cvrs in cspc namespaces
cvrList, err := p.clientset.CstorV1().CStorVolumeReplicas(p.OldCSPC.Namespace).List(metav1.ListOptions{
LabelSelector: types.CStorPoolInstanceNameLabelKey + "=" + cspiName,
})
if err != nil {
return false, fmt.Sprintf("Could not list cvr for cspi %s: %s", cspiName, err.Error())
}
if len(cvrList.Items) != 0 {
return false, fmt.Sprintf("volume still exists on pool %s", cspiName)
}
}
return true, ""
}

// IsScaledownCase checks whether it is scale down case or the node selector
// for an exsiting pool got changed.
func (p *PoolOperations) IsScaledownCase(oldPool cstor.PoolSpec) bool {
bdMap := map[string]int{}
for _, newPool := range p.NewCSPC.Spec.Pools {
for _, newRg := range append(newPool.DataRaidGroups, newPool.WriteCacheRaidGroups...) {
for _, newBD := range newRg.CStorPoolInstanceBlockDevices {
bdMap[newBD.BlockDeviceName]++
}
}
}
for _, oldRg := range append(oldPool.DataRaidGroups, oldPool.WriteCacheRaidGroups...) {
for _, oldBD := range oldRg.CStorPoolInstanceBlockDevices {
if bdMap[oldBD.BlockDeviceName] > 0 {
return false
}
}
}
return true
}
246 changes: 246 additions & 0 deletions pkg/webhook/cspc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,3 +945,249 @@ func TestBlockDeviceReplacement(t *testing.T) {
// Set OPENEBS_NAMESPACE env
os.Unsetenv("OPENEBS_NAMESPACE")
}

func TestCSPCScaleDown(t *testing.T) {
f := newFixture().withOpenebsObjects().withKubeObjects()
f.fakeNodeCreator(3)
// Each node will have 20 blockdevices
f.fakeBlockDeviceCreator(60, 3)
tests := map[string]struct {
// existingObj is object existing in etcd via fake client
existingObj *cstor.CStorPoolCluster
requestedObj *cstor.CStorPoolCluster
expectedRsp bool
getCSPCObj getCSPC
existingCSPIs []*cstor.CStorPoolInstance
existingCVRs []*cstor.CStorVolumeReplica
}{
"Negative scaledown case": {
existingObj: cstor.NewCStorPoolCluster().
WithName("cspc-foo-mirror").
WithNamespace("openebs").
WithPoolSpecs(
*cstor.NewPoolSpec().
WithNodeSelector(map[string]string{types.HostNameLabelKey: "worker-1"}).
WithPoolConfig(*cstor.NewPoolConfig().
WithDataRaidGroupType("mirror")).
WithDataRaidGroups(*cstor.NewRaidGroup().
WithCStorPoolInstanceBlockDevices(
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-1"),
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-2"),
),
),
*cstor.NewPoolSpec().
WithNodeSelector(map[string]string{types.HostNameLabelKey: "worker-2"}).
WithPoolConfig(*cstor.NewPoolConfig().
WithDataRaidGroupType("mirror")).
WithDataRaidGroups(*cstor.NewRaidGroup().
WithCStorPoolInstanceBlockDevices(
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-3"),
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-4"),
),
),
),
requestedObj: cstor.NewCStorPoolCluster().
WithName("cspc-foo-mirror").
WithNamespace("openebs").
WithPoolSpecs(
*cstor.NewPoolSpec().
WithNodeSelector(map[string]string{types.HostNameLabelKey: "worker-1"}).
WithPoolConfig(*cstor.NewPoolConfig().
WithDataRaidGroupType("mirror")).
WithDataRaidGroups(*cstor.NewRaidGroup().
WithCStorPoolInstanceBlockDevices(
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-1"),
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-2"),
),
),
),
existingCSPIs: []*cstor.CStorPoolInstance{
{
ObjectMeta: metav1.ObjectMeta{
Name: "cspc-foo-mirror-1",
Namespace: "openebs",
Labels: map[string]string{
types.HostNameLabelKey: "worker-1",
types.CStorPoolClusterLabelKey: "cspc-foo-mirror",
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "cspc-foo-mirror-2",
Namespace: "openebs",
Labels: map[string]string{
types.HostNameLabelKey: "worker-2",
types.CStorPoolClusterLabelKey: "cspc-foo-mirror",
},
},
},
},
existingCVRs: []*cstor.CStorVolumeReplica{
{
ObjectMeta: metav1.ObjectMeta{
Name: "cvr-cspc-foo-mirror-1",
Namespace: "openebs",
Labels: map[string]string{
types.HostNameLabelKey: "worker-1",
types.CStorPoolInstanceNameLabelKey: "cspc-foo-mirror-1",
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "cvr-cspc-foo-mirror-2",
Namespace: "openebs",
Labels: map[string]string{
types.HostNameLabelKey: "worker-2",
types.CStorPoolInstanceNameLabelKey: "cspc-foo-mirror-2",
},
},
},
},
expectedRsp: false,
getCSPCObj: getCSPCObject,
},
"Positive scaledown case": {
existingObj: cstor.NewCStorPoolCluster().
WithName("cspc-bar-mirror").
WithNamespace("openebs").
WithPoolSpecs(
*cstor.NewPoolSpec().
WithNodeSelector(map[string]string{types.HostNameLabelKey: "worker-1"}).
WithPoolConfig(*cstor.NewPoolConfig().
WithDataRaidGroupType("mirror")).
WithDataRaidGroups(*cstor.NewRaidGroup().
WithCStorPoolInstanceBlockDevices(
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-5"),
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-6"),
),
),
*cstor.NewPoolSpec().
WithNodeSelector(map[string]string{types.HostNameLabelKey: "worker-2"}).
WithPoolConfig(*cstor.NewPoolConfig().
WithDataRaidGroupType("mirror")).
WithDataRaidGroups(*cstor.NewRaidGroup().
WithCStorPoolInstanceBlockDevices(
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-7"),
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-8"),
),
),
),
requestedObj: cstor.NewCStorPoolCluster().
WithName("cspc-bar-mirror").
WithNamespace("openebs").
WithPoolSpecs(
*cstor.NewPoolSpec().
WithNodeSelector(map[string]string{types.HostNameLabelKey: "worker-1"}).
WithPoolConfig(*cstor.NewPoolConfig().
WithDataRaidGroupType("mirror")).
WithDataRaidGroups(*cstor.NewRaidGroup().
WithCStorPoolInstanceBlockDevices(
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-5"),
*cstor.NewCStorPoolInstanceBlockDevice().WithName("blockdevice-6"),
),
),
),
existingCSPIs: []*cstor.CStorPoolInstance{
{
ObjectMeta: metav1.ObjectMeta{
Name: "cspc-bar-mirror-1",
Namespace: "openebs",
Labels: map[string]string{
types.HostNameLabelKey: "worker-1",
types.CStorPoolClusterLabelKey: "cspc-bar-mirror",
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "cspc-bar-mirror-2",
Namespace: "openebs",
Labels: map[string]string{
types.HostNameLabelKey: "worker-2",
types.CStorPoolClusterLabelKey: "cspc-bar-mirror",
},
},
},
},
existingCVRs: []*cstor.CStorVolumeReplica{
{
ObjectMeta: metav1.ObjectMeta{
Name: "cvr=cspc-bar-mirror-1",
Namespace: "openebs",
Labels: map[string]string{
types.HostNameLabelKey: "worker-1",
types.CStorPoolInstanceNameLabelKey: "cspc-bar-mirror-1",
},
},
},
},
expectedRsp: true,
getCSPCObj: getCSPCObject,
},
}
// Set OPENEBS_NAMESPACE env
os.Setenv("OPENEBS_NAMESPACE", "openebs")
for name, test := range tests {
name, test := name, test
t.Run(name, func(t *testing.T) {
ar := &v1beta1.AdmissionRequest{
Operation: v1beta1.Create,
Object: runtime.RawExtension{
Raw: serialize(test.requestedObj),
},
}
// Create fake object in etcd
_, err := f.wh.clientset.CstorV1().
CStorPoolClusters(test.existingObj.Namespace).
Create(test.existingObj)
if err != nil {
t.Fatalf(
"failed to create fake CSPC %s Object in Namespace %s error: %v",
test.existingObj.Name,
test.existingObj.Namespace,
err,
)
}
for _, cspi := range test.existingCSPIs {
_, err := f.wh.clientset.CstorV1().
CStorPoolInstances(cspi.Namespace).
Create(cspi)
if err != nil {
t.Fatalf(
"failed to create fake CSPI %s Object in Namespace %s error: %v",
cspi.Name,
cspi.Namespace,
err,
)
}
}
for _, cvr := range test.existingCVRs {
_, err := f.wh.clientset.CstorV1().
CStorVolumeReplicas(cvr.Namespace).
Create(cvr)
if err != nil {
t.Fatalf(
"failed to create fake CVR %s Object in Namespace %s error: %v",
cvr.Name,
cvr.Namespace,
err,
)
}
}
resp := f.wh.validateCSPCUpdateRequest(ar, test.getCSPCObj)
if resp.Allowed != test.expectedRsp {
t.Errorf(
"%s test case failed expected response: %t but got %t error: %s",
name,
test.expectedRsp,
resp.Allowed,
resp.Result.Message,
)
}
})
}
// Set OPENEBS_NAMESPACE env
os.Unsetenv("OPENEBS_NAMESPACE")
}