Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core implementation for test runs in PLZ mode #239

Merged
merged 7 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/v1alpha1/k6_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type K6Configmap struct {
type Cleanup string

// Stage describes which stage of the test execution lifecycle our runners are in
// +kubebuilder:validation:Enum=initialization;initialized;created;started;finished;error
// +kubebuilder:validation:Enum=initialization;initialized;created;started;stopped;finished;error
type Stage string

// K6Status defines the observed state of K6
Expand Down Expand Up @@ -157,7 +157,8 @@ func init() {
}

// Parse extracts Script data bits from K6 spec and performs basic validation
func (spec K6Script) Parse() (*types.Script, error) {
func (k6 K6Spec) ParseScript() (*types.Script, error) {
spec := k6.Script
s := &types.Script{
Filename: "test.js",
Path: "/test/",
Expand Down
18 changes: 17 additions & 1 deletion api/v1alpha1/k6conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ const (
// - if True, it's after successful starter but before all runners have finished
TestRunRunning = "TestRunRunning"

// CloudTestRun indicates if this test run is supposed to be a cloud test run.
// CloudTestRun indicates if this test run is supposed to be a cloud test run
// (i.e. with `--out cloud` option).
// - if empty / Unknown, the type of test is unknown yet
// - if False, it is not a cloud test run
// - if True, it is a cloud test run
Expand All @@ -42,6 +43,14 @@ const (
// - if False, it's not a PLZ test run.
// - if True, it is a PLZ test run.
CloudPLZTestRun = "CloudPLZTestRun"

// CloudTestRunAborted indicates if this k6 Cloud test run was aborted externally,
// for any reason.
// This condition is valid only if CloudPLZTestRun is True as well.
// - if empty / Unknown, it's either a non-PLZ test run or it's unknown yet.
// - if False, it's a PLZ test run and it wasn't aborted.
// - if True, it is a PLZ test run and it was aborted.
CloudTestRunAborted = "CloudTestRunAborted"
)

// Initialize defines only conditions common to all test runs.
Expand Down Expand Up @@ -69,6 +78,8 @@ func (k6 *K6) Initialize() {
k6.UpdateCondition(CloudTestRun, metav1.ConditionTrue)
k6.UpdateCondition(CloudPLZTestRun, metav1.ConditionTrue)
k6.UpdateCondition(CloudTestRunCreated, metav1.ConditionTrue)
k6.UpdateCondition(CloudTestRunFinalized, metav1.ConditionFalse)
k6.UpdateCondition(CloudTestRunAborted, metav1.ConditionFalse)

k6.Status.TestRunID = k6.Spec.TestRunID
} else {
Expand Down Expand Up @@ -145,6 +156,11 @@ func (k6status *K6Status) SetIfNewer(proposedStatus K6Status) (isNewer bool) {
isNewer = true
}
case "started":
if proposedStatus.Stage == "stopped" || proposedStatus.Stage == "finished" || proposedStatus.Stage == "error" {
k6status.Stage = proposedStatus.Stage
isNewer = true
}
case "stopped":
if proposedStatus.Stage == "finished" || proposedStatus.Stage == "error" {
k6status.Stage = proposedStatus.Stage
isNewer = true
Expand Down
36 changes: 28 additions & 8 deletions api/v1alpha1/privateloadzone_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"

"github.com/go-logr/logr"
"github.com/grafana/k6-operator/pkg/cloud"

"go.k6.io/k6/cloudapi"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -65,20 +67,38 @@ func init() {
}

// Register attempts to register PLZ with the k6 Cloud.
// Regardless of the result, condition PLZRegistered will be updated.
func (plz *PrivateLoadZone) Register(ctx context.Context, logger logr.Logger, client *cloudapi.Client) {
// Regardless of the result, condition PLZRegistered will be set to False.
// Callee is expected to check the returned error and set condition when it's appropriate.
func (plz *PrivateLoadZone) Register(ctx context.Context, logger logr.Logger, client *cloudapi.Client) error {
plz.UpdateCondition(PLZRegistered, metav1.ConditionFalse)

// TODO add register call and error processing
data := cloud.PLZRegistrationData{
LoadZoneID: plz.Name,
Resources: cloud.PLZResources{
CPU: plz.Spec.Resources.Cpu().String(),
Memory: plz.Spec.Resources.Memory().String(),
},
}

if err := cloud.RegisterPLZ(client, data); err != nil {
logger.Error(err, fmt.Sprintf("Failed to register PLZ %s.", plz.Name))
return err
}

logger.Info(fmt.Sprintf("Registered PLZ %s.", plz.Name))

plz.UpdateCondition(PLZRegistered, metav1.ConditionTrue)
return nil
}

// Deregister attempts to deregister PLZ with the k6 Cloud.
// It is meant to be used as a finalizer.
func (plz *PrivateLoadZone) Deregister(ctx context.Context, logger logr.Logger, client *cloudapi.Client) {
// TODO add deregister call and error processing
func (plz *PrivateLoadZone) Deregister(ctx context.Context, logger logr.Logger, client *cloudapi.Client) error {
if err := cloud.DeRegisterPLZ(client, plz.Name); err != nil {
logger.Error(err, fmt.Sprintf("Failed to de-register PLZ %s.", plz.Name))
return err
}

fmt.Println("calling deregister for", *plz)
plz.UpdateCondition(PLZRegistered, metav1.ConditionFalse)
logger.Info(fmt.Sprintf("De-registered PLZ %s.", plz.Name))

return nil
}
1 change: 1 addition & 0 deletions config/crd/bases/k6.io_k6s.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4880,6 +4880,7 @@ spec:
- initialized
- created
- started
- stopped
- finished
- error
type: string
Expand Down
157 changes: 131 additions & 26 deletions controllers/k6_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package controllers

import (
"context"
"errors"
"fmt"
"time"

"go.k6.io/k6/cloudapi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

Expand Down Expand Up @@ -46,6 +48,10 @@ type K6Reconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme

// Note: here we assume that all users of the operator are allowed to use
// the same token / cloud client.
k6CloudClient *cloudapi.Client
}

// Reconcile takes a K6 object and takes the appropriate action in the cluster
Expand Down Expand Up @@ -73,6 +79,19 @@ func (r *K6Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re
return ctrl.Result{Requeue: true}, err
}

if k6.IsTrue(v1alpha1.CloudPLZTestRun) {
// bootstrap the client
found, err := r.createClient(ctx, k6, log)
if err != nil {
log.Error(err, "A problem while getting token.")
return ctrl.Result{}, err
}
if !found {
log.Info(fmt.Sprintf("Token `%s` is not found yet.", k6.Spec.Token))
return ctrl.Result{RequeueAfter: time.Second}, nil
}
}

log.Info(fmt.Sprintf("Reconcile(); stage = %s", k6.Status.Stage))

// Decision making here is now a mix between stages and conditions.
Expand Down Expand Up @@ -116,13 +135,9 @@ func (r *K6Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re
}
}

// log.Info(fmt.Sprintf("Debug \"initialization\" %v %v",
// k6.IsTrue(v1alpha1.CloudTestRun),
// k6.IsTrue(v1alpha1.CloudTestRunCreated)))

if k6.IsTrue(v1alpha1.CloudTestRun) {

if k6.IsFalse(v1alpha1.CloudTestRunCreated) && k6.IsFalse(v1alpha1.CloudPLZTestRun) {
if k6.IsFalse(v1alpha1.CloudTestRunCreated) {
return SetupCloudTest(ctx, log, k6, r)

} else {
Expand All @@ -146,45 +161,43 @@ func (r *K6Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re
return StartJobs(ctx, log, k6, r)

case "started":
// log.Info(fmt.Sprintf("Debug \"started\" %v %v",
// k6.IsTrue(v1alpha1.CloudTestRun),
// k6.IsTrue(v1alpha1.CloudTestRunFinalized)))

if k6.IsTrue(v1alpha1.CloudTestRun) && k6.IsTrue(v1alpha1.CloudTestRunFinalized) {
// a fluke - nothing to do
return ctrl.Result{}, nil
}

if k6.IsTrue(v1alpha1.CloudTestRunAborted) {
// a fluke - nothing to do
return ctrl.Result{}, nil
}

// wait for the test to finish
if !FinishJobs(ctx, log, k6, r) {

if k6.IsTrue(v1alpha1.CloudPLZTestRun) && k6.IsFalse(v1alpha1.CloudTestRunAborted) {
// check in with the BE for status
if r.ShouldAbort(ctx, k6, log) {
log.Info("Received an abort signal from the k6 Cloud: stopping the test.")
return StopJobs(ctx, log, k6, r)
}
}

// The test continues to execute.

// Test runs can take a long time and usually they aren't supposed
// to be too quick. So check in only periodically.
return ctrl.Result{RequeueAfter: time.Second * 15}, nil
}

log.Info("All runner pods are finished")

// now mark it as finished
// now mark it as stopped

if k6.IsTrue(v1alpha1.TestRunRunning) {
k6.UpdateCondition(v1alpha1.TestRunRunning, metav1.ConditionFalse)

log.Info("Changing stage of K6 status to finished")
k6.Status.Stage = "finished"

// If this is a test run with cloud output, try to finalize it.
if k6.IsTrue(v1alpha1.CloudTestRun) &&
k6.IsFalse(v1alpha1.CloudPLZTestRun) &&
k6.IsFalse(v1alpha1.CloudTestRunFinalized) {
if err = cloud.FinishTestRun(k6.Status.TestRunID); err != nil {
log.Error(err, "Failed to finalize the test run with cloud output")
return ctrl.Result{}, nil
} else {
log.Info(fmt.Sprintf("Cloud test run %s was finalized succesfully", k6.Status.TestRunID))

k6.UpdateCondition(v1alpha1.CloudTestRunFinalized, metav1.ConditionTrue)
}
}
log.Info("Changing stage of K6 status to stopped")
k6.Status.Stage = "stopped"

_, err := r.UpdateStatus(ctx, k6, log)
if err != nil {
Expand All @@ -195,6 +208,57 @@ func (r *K6Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re

return ctrl.Result{}, nil

case "stopped":
if k6.IsTrue(v1alpha1.CloudPLZTestRun) && k6.IsTrue(v1alpha1.CloudTestRunAborted) {
// This is a "forced" abort of the PLZ test run.
// Wait until all the test runs are stopped, kill jobs and proceed.
if StoppedJobs(ctx, log, k6, r) {
if allDeleted, err := KillJobs(ctx, log, k6, r); err != nil {
return ctrl.Result{RequeueAfter: time.Second}, err
} else {
// if we just have deleted all jobs, update status and go for reconcile
if allDeleted {
k6.UpdateCondition(v1alpha1.CloudTestRunAborted, metav1.ConditionTrue)
_, err := r.UpdateStatus(ctx, k6, log)
if err != nil {
return ctrl.Result{}, err
}
}
}
}
}

// If this is a cloud test run in any mode, try to finalize it.
if k6.IsTrue(v1alpha1.CloudTestRun) &&
k6.IsFalse(v1alpha1.CloudTestRunFinalized) {

// If TestRunRunning has just been updated, wait for a bit before
// acting, to avoid race condition between different reconcile loops.
t, _ := k6.LastUpdate(v1alpha1.TestRunRunning)
if time.Now().Sub(t) < 5*time.Second {
return ctrl.Result{RequeueAfter: time.Second * 2}, nil
}

if err = cloud.FinishTestRun(r.k6CloudClient, k6.Status.TestRunID); err != nil {
log.Error(err, "Failed to finalize the test run with cloud output")
return ctrl.Result{}, nil
} else {
log.Info(fmt.Sprintf("Cloud test run %s was finalized successfully", k6.Status.TestRunID))

k6.UpdateCondition(v1alpha1.CloudTestRunFinalized, metav1.ConditionTrue)
}
}

log.Info("Changing stage of K6 status to finished")
k6.Status.Stage = "finished"

_, err := r.UpdateStatus(ctx, k6, log)
if err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{RequeueAfter: time.Second}, nil

case "error", "finished":
// delete if configured
if k6.Spec.Cleanup == "post" {
Expand Down Expand Up @@ -284,3 +348,44 @@ func (r *K6Reconciler) UpdateStatus(ctx context.Context, k6 *v1alpha1.K6, log lo

return true, nil
}

// ShouldAbort retrieves the status of test run from the Cloud and whether it should
// cause a forced stop. It is meant to be used only by PLZ test runs.
func (r *K6Reconciler) ShouldAbort(ctx context.Context, k6 *v1alpha1.K6, log logr.Logger) bool {
// sanity check
if len(k6.Status.TestRunID) == 0 {
log.Error(errors.New("empty test run ID"), "Trying to get state of test run with empty test run ID")
return false
}

status, err := cloud.GetTestRunState(r.k6CloudClient, k6.Status.TestRunID, log)
if err != nil {
log.Error(err, "Failed to get test run state.")
return false
}

isAborted := status.Aborted()

log.Info(fmt.Sprintf("Received test run status %v", status))

return isAborted
}

func (r *K6Reconciler) createClient(ctx context.Context, k6 *v1alpha1.K6, log logr.Logger) (bool, error) {
if r.k6CloudClient == nil {
token, tokenReady, err := loadToken(ctx, log, r.Client, k6.Spec.Token, &client.ListOptions{Namespace: k6.Namespace})
if err != nil {
log.Error(err, "A problem while getting token.")
return false, err
}
if !tokenReady {
return false, nil
}

host := getEnvVar(k6.Spec.Runner.Env, "K6_CLOUD_HOST")

r.k6CloudClient = cloud.NewClient(log, token, host)
}

return true, nil
}
14 changes: 11 additions & 3 deletions controllers/k6_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,22 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reco
var (
err error
res ctrl.Result
token string // only for cloud output tests
token string // only for cloud tests
)

if k6.IsTrue(v1alpha1.CloudTestRun) && k6.IsTrue(v1alpha1.CloudTestRunCreated) {
log = log.WithValues("testRunId", k6.Status.TestRunID)

var tokenReady bool
token, tokenReady, err = loadToken(ctx, log, r.Client, k6.Spec.Token, &client.ListOptions{Namespace: k6.Namespace})
var (
tokenReady bool
sOpts *client.ListOptions
)

if k6.IsTrue(v1alpha1.CloudPLZTestRun) {
sOpts = &client.ListOptions{Namespace: k6.Namespace}
}

token, tokenReady, err = loadToken(ctx, log, r.Client, k6.Spec.Token, sOpts)
if err != nil {
// An error here means a very likely mis-configuration of the token.
// Consider updating status to error to let a user know quicker?
Expand Down
Loading