Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(refactor): add fake controller testing for cspi-controller #65

Merged
merged 5 commits into from
May 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/controllers/cspc-controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ limitations under the License.
package cspccontroller

import (
"strconv"
"testing"
"time"

cstor "github.com/openebs/api/pkg/apis/cstor/v1"
openebscore "github.com/openebs/api/pkg/apis/openebs.io/v1alpha1"
"github.com/openebs/api/pkg/apis/types"
openebsFakeClientset "github.com/openebs/api/pkg/client/clientset/versioned/fake"
openebsinformers "github.com/openebs/api/pkg/client/informers/externalversions"
"github.com/openebs/cstor-operators/pkg/controllers/testutil"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -32,9 +36,6 @@ import (
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"strconv"
"testing"
"time"
)

// fixture encapsulates fake client sets and client-go testing objects.
Expand Down
12 changes: 9 additions & 3 deletions pkg/controllers/cspi-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

cstor "github.com/openebs/api/pkg/apis/cstor/v1"
common "github.com/openebs/cstor-operators/pkg/controllers/common"
zpool "github.com/openebs/cstor-operators/pkg/pool/operations"
zcmd "github.com/openebs/cstor-operators/pkg/zcmd/bin"

clientset "github.com/openebs/api/pkg/client/clientset/versioned"
openebsScheme "github.com/openebs/api/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -59,6 +59,10 @@ type CStorPoolInstanceController struct {
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder

// zcmdExecutor is an interface that knows to execute ZFS and ZPOOL commands.
// This is useful in mocking.
zcmdExecutor zcmd.Executor
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
}

// NewCStorPoolInstanceController returns a new instance of CStorPoolInstance controller
Expand All @@ -70,8 +74,6 @@ func NewCStorPoolInstanceController(

// obtain references to shared index informers for the cStorPoolInstance resources
cStorPoolInstanceInformer := cStorInformerFactory.Cstor().V1().CStorPoolInstances()
zpool.KubeClient = kubeclientset
zpool.OpenEBSClient = clientset

err := openebsScheme.AddToScheme(scheme.Scheme)
if err != nil {
Expand All @@ -92,12 +94,16 @@ func NewCStorPoolInstanceController(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: poolControllerName})

// Instantiate the zcmdExecutor to execute zpool/zfs commands
zcmdExecutor := zcmd.NewZcmd()
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved

controller := &CStorPoolInstanceController{
kubeclientset: kubeclientset,
clientset: clientset,
cStorPoolInstanceSynced: cStorPoolInstanceInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), poolControllerName),
recorder: recorder,
zcmdExecutor: zcmdExecutor,
}

klog.Info("Setting up event handlers for CSPI")
Expand Down
74 changes: 61 additions & 13 deletions pkg/controllers/cspi-controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,26 @@ func (c *CStorPoolInstanceController) reconcile(key string) error {

cspi = cspiObj

// take a lock for common package for updating variables
common.SyncResources.Mux.Lock()
// validate CSPI validates the CSPI
err = validateCSPI(cspi)
if err != nil {
c.recorder.Event(cspi,
corev1.EventTypeWarning,
"Validation failed",
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
err.Error())
return nil
}

mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
// Instantiate the pool operation config
// ToDo: NewOperationsConfig is used a other handlers e.g. destroy: fix the repeatability.
oc := zpool.NewOperationsConfig().
WithKubeClientSet(c.kubeclientset).
WithOpenEBSClient(c.clientset).
WithRecorder(c.recorder)
WithRecorder(c.recorder).
WithZcmdExecutor(c.zcmdExecutor)

// take a lock for common package for updating variables
common.SyncResources.Mux.Lock()

// try to import pool
isImported, err = oc.Import(cspi)
Expand Down Expand Up @@ -192,7 +203,9 @@ func (c *CStorPoolInstanceController) destroy(cspi *cstor.CStorPoolInstance) err
// Instantiate the pool operation config
oc := zpool.NewOperationsConfig().
WithKubeClientSet(c.kubeclientset).
WithOpenEBSClient(c.clientset)
WithOpenEBSClient(c.clientset).
WithZcmdExecutor(c.zcmdExecutor)

// DeletePool is to delete cstor zpool.
// It will also clear the label for relevant disk
err := oc.Delete(cspi)
Expand All @@ -218,7 +231,7 @@ func (c *CStorPoolInstanceController) destroy(cspi *cstor.CStorPoolInstance) err

updatestatus:
cspi.Status.Phase = phase
if _, er := zpool.OpenEBSClient.
if _, er := c.clientset.
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
CstorV1().
CStorPoolInstances(cspi.Namespace).
Update(cspi); er != nil {
Expand All @@ -231,7 +244,8 @@ func (c *CStorPoolInstanceController) update(cspi *cstor.CStorPoolInstance) (*cs
oc := zpool.NewOperationsConfig().
WithKubeClientSet(c.kubeclientset).
WithOpenEBSClient(c.clientset).
WithRecorder(c.recorder)
WithRecorder(c.recorder).
WithZcmdExecutor(c.zcmdExecutor)
ncspi, err := oc.Update(cspi)
if err != nil {
return ncspi, errors.Errorf("Failed to update pool due to %s", err.Error())
Expand All @@ -245,10 +259,12 @@ func (c *CStorPoolInstanceController) updateStatus(cspi *cstor.CStorPoolInstance
var status cstor.CStorPoolInstanceStatus
pool := zpool.PoolName()
propertyList := []string{"health", "io.openebs:readonly"}
oc := zpool.NewOperationsConfig().
WithZcmdExecutor(c.zcmdExecutor)

// Since we queried in following order health and io.openebs:readonly output also
// will be in same order
valueList, err := zpool.GetListOfPropertyValues(pool, propertyList)
valueList, err := oc.GetListOfPropertyValues(pool, propertyList)
if err != nil {
return cspi, errors.Errorf("Failed to fetch %v output: %v error: %v", propertyList, valueList, err)
} else {
Expand All @@ -260,7 +276,7 @@ func (c *CStorPoolInstanceController) updateStatus(cspi *cstor.CStorPoolInstance
}
}

status.Capacity, err = zpool.GetCSPICapacity(pool)
status.Capacity, err = oc.GetCSPICapacity(pool)
if err != nil {
return cspi, errors.Errorf("Failed to sync due to %s", err.Error())
}
Expand All @@ -272,7 +288,7 @@ func (c *CStorPoolInstanceController) updateStatus(cspi *cstor.CStorPoolInstance

if IsStatusChange(cspi.Status, status) {
cspi.Status = status
cspiGot, err := zpool.OpenEBSClient.
cspiGot, err := c.clientset.
CstorV1().
CStorPoolInstances(cspi.Namespace).
Update(cspi)
Expand All @@ -295,20 +311,23 @@ func (c *CStorPoolInstanceController) updateStatus(cspi *cstor.CStorPoolInstance
func (c *CStorPoolInstanceController) updateROMode(
cspiStatus *cstor.CStorPoolInstanceStatus, cspi cstor.CStorPoolInstance) {
roThresholdLimit := 85

if cspi.Spec.PoolConfig.ROThresholdLimit != nil {
roThresholdLimit = *cspi.Spec.PoolConfig.ROThresholdLimit
}
totalInBytes := cspiStatus.Capacity.Total.Value()
usedInBytes := cspiStatus.Capacity.Used.Value()
pool := zpool.PoolName()
oc := zpool.NewOperationsConfig().
WithZcmdExecutor(c.zcmdExecutor)

usedPercentage := (usedInBytes * 100) / totalInBytes
// If roThresholdLimit sets 100% and pool used storage reached to 100%
// then there might be chances that operations will hung so it is not
// recommended to perform operations
if (int(usedPercentage) >= roThresholdLimit) && roThresholdLimit != 100 {
if !cspiStatus.ReadOnly {
if err := zpool.SetPoolRDMode(pool, true); err != nil {
if err := oc.SetPoolRDMode(pool, true); err != nil {
// Here, we are just logging in next reconciliation it will be retried
klog.Errorf("failed to set pool ReadOnly Mode to %t error: %s", true, err.Error())
} else {
Expand All @@ -323,7 +342,7 @@ func (c *CStorPoolInstanceController) updateROMode(
}
} else {
if cspiStatus.ReadOnly {
if err := zpool.SetPoolRDMode(pool, false); err != nil {
if err := oc.SetPoolRDMode(pool, false); err != nil {
klog.Errorf("Failed to unset pool readOnly mode : %v", err)
} else {
cspiStatus.ReadOnly = false
Expand Down Expand Up @@ -405,10 +424,12 @@ func (c *CStorPoolInstanceController) addPoolProtectionFinalizer(
}

func (c *CStorPoolInstanceController) sync(cspi *cstor.CStorPoolInstance) {
oc := zpool.NewOperationsConfig().
WithZcmdExecutor(c.zcmdExecutor)
// Right now the only sync activity is compression
compressionType := cspi.Spec.PoolConfig.Compression
poolName := zpool.PoolName()
err := zpool.SetCompression(poolName, compressionType)
err := oc.SetCompression(poolName, compressionType)
if err != nil {
c.recorder.Event(cspi,
corev1.EventTypeWarning,
Expand All @@ -422,8 +443,10 @@ func (c *CStorPoolInstanceController) addDiskUnavailableCondition(cspi *cstor.CS
oc := zpool.NewOperationsConfig().
WithKubeClientSet(c.kubeclientset).
WithOpenEBSClient(c.clientset).
WithRecorder(c.recorder)
WithRecorder(c.recorder).
WithZcmdExecutor(c.zcmdExecutor)
unAvailableDisks, err := oc.GetUnavailableDiskList(cspi)

if err != nil {
klog.Errorf("failed to get unavailable disks error: %v", err)
return
Expand Down Expand Up @@ -492,3 +515,28 @@ func (c *CStorPoolInstanceController) reconcileVersion(cspi *cstor.CStorPoolInst
}
return cspi, nil
}

// validateCSPI returns error if CSPI spec validation fails otherwise nil
func validateCSPI(cspi *cstor.CStorPoolInstance) error {
if len(cspi.Spec.DataRaidGroups) == 0 {
return errors.Errorf("No data RaidGroups exists")
}
if cspi.Spec.PoolConfig.DataRaidGroupType == "" {
return errors.Errorf("Missing DataRaidGroupType")
}
if len(cspi.Spec.WriteCacheRaidGroups) != 0 &&
cspi.Spec.PoolConfig.WriteCacheGroupType == "" {
return errors.Errorf("Missing WriteCacheRaidGroupType")
}
for _, rg := range cspi.Spec.DataRaidGroups {
if len(rg.CStorPoolInstanceBlockDevices) == 0 {
return errors.Errorf("No BlockDevices exist in one of the DataRaidGroup")
}
}
for _, rg := range cspi.Spec.WriteCacheRaidGroups {
if len(rg.CStorPoolInstanceBlockDevices) == 0 {
return errors.Errorf("No BlockDevices exist in one of the WriteCache RaidGroup")
}
}
return nil
}
Loading