diff --git a/api/v1alpha1/conditions.go b/api/v1alpha1/conditions.go new file mode 100644 index 00000000..6e0ef6f3 --- /dev/null +++ b/api/v1alpha1/conditions.go @@ -0,0 +1,181 @@ +package v1alpha1 + +import ( + "fmt" + "strings" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + // TestRunRunning indicates if the test run is currently running. + // - if empty / Unknown, it's any stage before k6 resume (starter) + // - if False, it's after all runners have finished successfully or with error + // - if True, it's after successful starter but before all runners have finished + TestRunRunning = "TestRunRunning" + + // CloudTestRun indicates if this test run is supposed to be a cloud test run. + // - if empty / Unknown, the type of test is unknown yet + // - if False, it is not a cloud test run + // - if True, it is a cloud test run + CloudTestRun = "CloudTestRun" + + // CloudTestRunCreated indicates if k6 Cloud test run ID has been created for this test. + // - if empty / Unknown, it's either a non-cloud test run or it is a cloud test run + // that wasn't created yet + // - if False, it is a cloud test run and it is yet to be created + // - if True, it is a cloud test run and it has been created already + CloudTestRunCreated = "CloudTestRunCreated" + + // CloudTestRunFinalized indicates if k6 Cloud test run has been finalized. + // - if empty / Unknown, it's either a non-cloud test run or it is a cloud test run + // that wasn't finalized yet + // - if False, it's a cloud test run and it is yet to be finalized + // - if True, it's a cloud test run that has been finalized already + CloudTestRunFinalized = "CloudTestRunFinalized" +) + +var reasons = map[string]string{ + "TestRunRunningUnknown": "TestRunPreparation", + "TestRunRunningTrue": "TestRunRunningTrue", + "TestRunRunningFalse": "TestRunRunningFalse", + + "CloudTestRunUnknown": "TestRunTypeUnknown", + "CloudTestRunTrue": "CloudTestRunTrue", + "CloudTestRunFalse": "CloudTestRunFalse", + + "CloudTestRunCreatedUnknown": "CloudTestRunCreatedUnknown", + "CloudTestRunCreatedTrue": "CloudTestRunCreatedTrue", + "CloudTestRunCreatedFalse": "CloudTestRunCreatedFalse", + + "CloudTestRunFinalizedUnknown": "CloudTestRunFinalizedUnknown", + "CloudTestRunFinalizedTrue": "CloudTestRunFinalizedTrue", + "CloudTestRunFinalizedFalse": "CloudTestRunFinalizedFalse", +} + +// InitializeConditions defines only conditions common to all test runs. +func (k6 *K6) InitializeConditions() { + t := metav1.Now() + k6.Status.Conditions = []metav1.Condition{ + metav1.Condition{ + Type: CloudTestRun, + Status: metav1.ConditionUnknown, + LastTransitionTime: t, + Reason: "TestRunTypeUnknown", + Message: "", + }, + metav1.Condition{ + Type: TestRunRunning, + Status: metav1.ConditionUnknown, + LastTransitionTime: t, + Reason: "TestRunPreparation", + Message: "", + }, + } +} + +func (k6 *K6) UpdateCondition(conditionType string, conditionStatus metav1.ConditionStatus) { + reason, ok := reasons[conditionType+string(conditionStatus)] + if !ok { + panic(fmt.Sprintf("Invalid condition type and status! `%s` - this should never happen!", conditionType+string(conditionStatus))) + } + meta.SetStatusCondition(&k6.Status.Conditions, metav1.Condition{ + Type: conditionType, + Status: conditionStatus, + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: "", + }) +} + +func (k6 K6) IsTrue(conditionType string) bool { + return meta.IsStatusConditionTrue(k6.Status.Conditions, conditionType) +} + +func (k6 K6) IsFalse(conditionType string) bool { + return meta.IsStatusConditionFalse(k6.Status.Conditions, conditionType) +} + +func (k6 K6) IsUnknown(conditionType string) bool { + return !k6.IsFalse(conditionType) && !k6.IsTrue(conditionType) +} + +func (k6 K6) LastUpdate(conditionType string) (time.Time, bool) { + cond := meta.FindStatusCondition(k6.Status.Conditions, conditionType) + if cond != nil { + return cond.LastTransitionTime.Time, true + } + return time.Now(), false +} + +// SetIfNewer changes k6status only if changes in proposedStatus are consistent +// with the expected progression of a test run. If there were any acceptable +// changes proposed, it returns true. +func (k6status *K6Status) SetIfNewer(proposedStatus K6Status) (isNewer bool) { + existingConditions := map[string]metav1.Condition{} + for i := range k6status.Conditions { + existingConditions[k6status.Conditions[i].Type] = k6status.Conditions[i] + } + + for _, proposedCondition := range proposedStatus.Conditions { + // If a new condition is being proposed, just add it to the list. + if existingCondition, ok := existingConditions[proposedCondition.Type]; !ok { + k6status.Conditions = append(k6status.Conditions, proposedCondition) + isNewer = true + } else { + // If a change in existing condition is being proposed, check if + // its timestamp is later than the one in existing condition. + // + // Additionally: condition should never return to Unknown status + // unless it's newly created. + + if proposedCondition.Status != metav1.ConditionUnknown { + if existingCondition.LastTransitionTime.UnixNano() < proposedCondition.LastTransitionTime.UnixNano() { + meta.SetStatusCondition(&k6status.Conditions, proposedCondition) + isNewer = true + } + } + } + + // Accept change of test run ID only if it's not set yet and together with + // corresponding condition. + if proposedCondition.Type == CloudTestRunCreated && + len(k6status.TestRunID) == 0 && + len(proposedStatus.TestRunID) > 0 { + k6status.TestRunID = proposedStatus.TestRunID + isNewer = true + } + // log if proposedStatus.TestRunID is empty here? + } + + // If a change in stage is proposed, confirm that it is consistent with + // expected flow of any test run. + if k6status.Stage != proposedStatus.Stage && len(proposedStatus.Stage) > 0 { + switch k6status.Stage { + case "", "initialization": + k6status.Stage = proposedStatus.Stage + isNewer = true + + case "initialized": + if !strings.HasPrefix(string(proposedStatus.Stage), "init") { + k6status.Stage = proposedStatus.Stage + isNewer = true + } + case "created": + if proposedStatus.Stage == "started" || proposedStatus.Stage == "finished" || proposedStatus.Stage == "error" { + k6status.Stage = proposedStatus.Stage + isNewer = true + } + case "started": + if proposedStatus.Stage == "finished" || proposedStatus.Stage == "error" { + k6status.Stage = proposedStatus.Stage + isNewer = true + } + // in case of finished or error stage, skip + } + } + + return +} diff --git a/api/v1alpha1/k6_types.go b/api/v1alpha1/k6_types.go index 1d5374ef..6a387c39 100644 --- a/api/v1alpha1/k6_types.go +++ b/api/v1alpha1/k6_types.go @@ -102,6 +102,8 @@ type Stage string type K6Status struct { Stage Stage `json:"stage,omitempty"` TestRunID string `json:"testRunId,omitempty"` + + Conditions []metav1.Condition `json:"conditions,omitempty"` } // K6 is the Schema for the k6s API diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 3b83fa6e..b8af93ff 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -23,6 +23,7 @@ package v1alpha1 import ( "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -32,7 +33,7 @@ func (in *K6) DeepCopyInto(out *K6) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new K6. @@ -164,6 +165,13 @@ func (in *K6Spec) DeepCopy() *K6Spec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *K6Status) DeepCopyInto(out *K6Status) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new K6Status. diff --git a/config/crd/bases/k6.io_k6s.yaml b/config/crd/bases/k6.io_k6s.yaml index f4c94f29..d904bb93 100644 --- a/config/crd/bases/k6.io_k6s.yaml +++ b/config/crd/bases/k6.io_k6s.yaml @@ -4064,6 +4064,76 @@ spec: status: description: K6Status defines the observed state of K6 properties: + conditions: + items: + description: "Condition contains details for one aspect of the current + state of this API Resource. --- This struct is intended for direct + use as an array at the field path .status.conditions. For example, + \n \ttype FooStatus struct{ \t // Represents the observations + of a foo's current state. \t // Known .status.conditions.type + are: \"Available\", \"Progressing\", and \"Degraded\" \t // + +patchMergeKey=type \t // +patchStrategy=merge \t // +listType=map + \t // +listMapKey=type \t Conditions []metav1.Condition + `json:\"conditions,omitempty\" patchStrategy:\"merge\" patchMergeKey:\"type\" + protobuf:\"bytes,1,rep,name=conditions\"` \n \t // other fields + \t}" + properties: + lastTransitionTime: + description: lastTransitionTime is the last time the condition + transitioned from one status to another. This should be when + the underlying condition changed. If that is not known, then + using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: message is a human readable message indicating + details about the transition. This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: observedGeneration represents the .metadata.generation + that the condition was set based upon. For instance, if .metadata.generation + is currently 12, but the .status.conditions[x].observedGeneration + is 9, the condition is out of date with respect to the current + state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: reason contains a programmatic identifier indicating + the reason for the condition's last transition. Producers + of specific condition types may define expected values and + meanings for this field, and whether the values are considered + a guaranteed API. The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + --- Many .condition.type values are consistent across resources + like Available, but because arbitrary conditions can be useful + (see .node.status.conditions), the ability to deconflict is + important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array stage: description: Stage describes which stage of the test execution lifecycle our runners are in diff --git a/controllers/k6_controller.go b/controllers/k6_controller.go index ab188ed5..4b5719f2 100644 --- a/controllers/k6_controller.go +++ b/controllers/k6_controller.go @@ -17,22 +17,26 @@ package controllers import ( "context" "fmt" + "time" - v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" + "github.com/grafana/k6-operator/pkg/cloud" batchv1 "k8s.io/api/batch/v1" - "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/api/core/v1" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) const k6CrLabelName = "k6_cr" @@ -51,15 +55,14 @@ type K6Reconciler struct { // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list; // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;create;update -func (r *K6Reconciler) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) { - ctx := context.Background() - log := r.Log.WithValues("namespace", req.Namespace, "name", req.Name) +func (r *K6Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.Log.WithValues("namespace", req.Namespace, "name", req.Name, "reconcileID", controller.ReconcileIDFromContext(ctx)) // Fetch the CRD k6 := &v1alpha1.K6{} err := r.Get(ctx, req.NamespacedName, k6) if err != nil { - if errors.IsNotFound(err) { + if k8sErrors.IsNotFound(err) { log.Info("Request deleted. Nothing to reconcile.") return ctrl.Result{}, nil } @@ -69,18 +72,124 @@ func (r *K6Reconciler) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Resu log.Info(fmt.Sprintf("Reconcile(); stage = %s", k6.Status.Stage)) + // Decision making here is now a mix between stages and conditions. + // TODO: refactor further. + switch k6.Status.Stage { case "": - return InitializeJobs(ctx, log, k6, r) + log.Info("Initialize test") + + k6.InitializeConditions() + + if _, err := r.UpdateStatus(ctx, k6, log); err != nil { + return ctrl.Result{}, err + } + + log.Info("Changing stage of K6 status to initialization") + k6.Status.Stage = "initialization" + if updateHappened, err := r.UpdateStatus(ctx, k6, log); err != nil { + return ctrl.Result{}, err + } else if updateHappened { + return InitializeJobs(ctx, log, k6, r) + } + return ctrl.Result{}, nil + case "initialization": - return RunValidations(ctx, log, k6, r) + if k6.IsUnknown(v1alpha1.CloudTestRun) { + return RunValidations(ctx, log, k6, r) + } + + if k6.IsFalse(v1alpha1.CloudTestRun) { + // RunValidations has already happened and this is not a + // cloud test: we can move on + log.Info("Changing stage of K6 status to initialized") + + k6.Status.Stage = "initialized" + + if updateHappened, err := r.UpdateStatus(ctx, k6, log); err != nil { + return ctrl.Result{}, err + } else if updateHappened { + return ctrl.Result{}, nil + } + } + + // log.Info(fmt.Sprintf("Debug \"initialization\" %v %v", + // k6.IsTrue(v1alpha1.CloudTestRun), + // k6.IsTrue(v1alpha1.CloudTestRunCreated))) + + if k6.IsTrue(v1alpha1.CloudTestRun) { + + if k6.IsFalse(v1alpha1.CloudTestRunCreated) { + return SetupCloudTest(ctx, log, k6, r) + + } else { + // if test run was created, then only changing status is left + log.Info("Changing stage of K6 status to initialized") + + k6.Status.Stage = "initialized" + + if _, err := r.UpdateStatus(ctx, k6, log); err != nil { + return ctrl.Result{}, err + } + } + } + + return ctrl.Result{}, nil + case "initialized": return CreateJobs(ctx, log, k6, r) + case "created": return StartJobs(ctx, log, k6, r) + case "started": - // wait for test to finish and then mark as finished - return FinishJobs(ctx, log, k6, r) + // log.Info(fmt.Sprintf("Debug \"started\" %v %v", + // k6.IsTrue(v1alpha1.CloudTestRun), + // k6.IsTrue(v1alpha1.CloudTestRunFinalized))) + + if k6.IsTrue(v1alpha1.CloudTestRun) && k6.IsTrue(v1alpha1.CloudTestRunFinalized) { + // a fluke - nothing to do + return ctrl.Result{}, nil + } + + // wait for the test to finish + if !FinishJobs(ctx, log, k6, r) { + // Test runs can take a long time and usually they aren't supposed + // to be too quick. So check in only periodically. + return ctrl.Result{RequeueAfter: time.Second * 15}, nil + } + + log.Info("All runner pods are finished") + + // now mark it as finished + + if k6.IsTrue(v1alpha1.TestRunRunning) { + k6.UpdateCondition(v1alpha1.TestRunRunning, metav1.ConditionFalse) + + log.Info("Changing stage of K6 status to finished") + k6.Status.Stage = "finished" + + // If this is a test run with cloud output, try to finalize it. + if k6.IsTrue(v1alpha1.CloudTestRun) && k6.IsFalse(v1alpha1.CloudTestRunFinalized) { + if err = cloud.FinishTestRun(k6.Status.TestRunID); err != nil { + log.Error(err, "Failed to finalize the test run with cloud output") + return ctrl.Result{}, nil + } else { + log.Info(fmt.Sprintf("Cloud test run %s was finalized succesfully", k6.Status.TestRunID)) + + k6.UpdateCondition(v1alpha1.CloudTestRunFinalized, metav1.ConditionTrue) + } + } + + _, err := r.UpdateStatus(ctx, k6, log) + if err != nil { + return ctrl.Result{}, err + } + // log.Info(fmt.Sprintf("Debug updating status after finalize %v", updateHappened)) + } + + return ctrl.Result{}, nil + case "error", "finished": // delete if configured if k6.Spec.Cleanup == "post" { @@ -124,5 +233,49 @@ func (r *K6Reconciler) SetupWithManager(mgr ctrl.Manager) error { } return true }))). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + // RateLimiter - ? + }). Complete(r) } + +func (r *K6Reconciler) UpdateStatus(ctx context.Context, k6 *v1alpha1.K6, log logr.Logger) (updateHappened bool, err error) { + proposedStatus := k6.Status + + // re-fetch resource + err = r.Get(ctx, types.NamespacedName{Namespace: k6.Namespace, Name: k6.Name}, k6) + if err != nil { + if k8sErrors.IsNotFound(err) { + log.Info("Request deleted. No status to update.") + return false, nil + } + log.Error(err, "Could not fetch request") + return false, err + } + + cleanObj := k6.DeepCopyObject().(client.Object) + + // Update only if it's truly a newer version of the resource + // in comparison to the recently fetched resource. + isNewer := k6.Status.SetIfNewer(proposedStatus) + if !isNewer { + return false, nil + } + + err = r.Client.Status().Patch(ctx, k6, client.MergeFrom(cleanObj)) + + // TODO: look into retry.RetryOnConflict(retry.DefaultRetry, func() error{...}) + // to have retries of failing update here, in case of conflicts; + // with optional retry bool arg probably. + + // TODO: what if resource was deleted right before Patch? + // Add a check for IsNotFound(err). + + if err != nil { + log.Error(err, "Could not update status of custom resource") + return false, err + } + + return true, nil +} diff --git a/controllers/k6_create.go b/controllers/k6_create.go index 63ccdae1..b36b2582 100644 --- a/controllers/k6_create.go +++ b/controllers/k6_create.go @@ -3,6 +3,7 @@ package controllers import ( "context" "fmt" + "time" "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" @@ -10,10 +11,8 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) // CreateJobs creates jobs that will spawn k6 pods for distributed test @@ -24,38 +23,20 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reco token string // only for cloud output tests ) - if len(k6.Status.TestRunID) > 0 { + if k6.IsTrue(v1alpha1.CloudTestRun) && k6.IsTrue(v1alpha1.CloudTestRunCreated) { log = log.WithValues("testRunId", k6.Status.TestRunID) - var ( - secrets corev1.SecretList - secretOpts = &client.ListOptions{ - // TODO: find out a better way to get namespace here - Namespace: "k6-operator-system", - LabelSelector: labels.SelectorFromSet(map[string]string{ - "k6cloud": "token", - }), - } - ) - if err := r.List(ctx, &secrets, secretOpts); err != nil { - log.Error(err, "Failed to load k6 Cloud token") - return res, err + var tokenReady bool + token, tokenReady, err = loadToken(ctx, log, r) + if err != nil { + // An error here means a very likely mis-configuration of the token. + // Consider updating status to error to let a user know quicker? + log.Error(err, "A problem while getting token.") + return ctrl.Result{}, nil } - - if len(secrets.Items) < 1 { - err := fmt.Errorf("There are no secrets to hold k6 Cloud token") - log.Error(err, err.Error()) - return res, err - } - - if t, ok := secrets.Items[0].Data["token"]; !ok { - err := fmt.Errorf("The secret doesn't have a field token for k6 Cloud") - log.Error(err, err.Error()) - return res, err - } else { - token = string(t) + if !tokenReady { + return ctrl.Result{RequeueAfter: time.Second * 5}, nil } - log.Info("Token for k6 Cloud was loaded.") } log.Info("Creating test jobs") @@ -66,11 +47,12 @@ func CreateJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reco log.Info("Changing stage of K6 status to created") k6.Status.Stage = "created" - if err = r.Client.Status().Update(ctx, k6); err != nil { - log.Error(err, "Could not update status of custom resource") - return ctrl.Result{}, nil - } + if updateHappened, err := r.UpdateStatus(ctx, k6, log); err != nil { + return ctrl.Result{}, err + } else if updateHappened { + return ctrl.Result{Requeue: true}, nil + } return ctrl.Result{}, nil } diff --git a/controllers/k6_finish.go b/controllers/k6_finish.go index 3eaf464f..dad2bfab 100644 --- a/controllers/k6_finish.go +++ b/controllers/k6_finish.go @@ -6,23 +6,19 @@ import ( "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" - "github.com/grafana/k6-operator/pkg/cloud" - "github.com/grafana/k6-operator/pkg/types" batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/labels" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) -// FinishJobs waits for the pods to finish, performs finishing call for cloud output and moves state to "finished". -func FinishJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) { +// FinishJobs checks if the runners pods have finished execution. +func FinishJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (allFinished bool) { if len(k6.Status.TestRunID) > 0 { log = log.WithValues("testRunId", k6.Status.TestRunID) } log.Info("Checking if all runner pods are finished") - var err error selector := labels.SelectorFromSet(map[string]string{ "app": "k6", "k6_cr": k6.Name, @@ -31,10 +27,11 @@ func FinishJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reco opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.Namespace} jl := &batchv1.JobList{} + var err error - if err := r.List(ctx, jl, opts); err != nil { + if err = r.List(ctx, jl, opts); err != nil { log.Error(err, "Could not list jobs") - return ctrl.Result{}, nil + return } // TODO: We should distinguish between Suceeded/Failed/Unknown @@ -49,26 +46,9 @@ func FinishJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reco log.Info(fmt.Sprintf("%d/%d jobs complete", finished, k6.Spec.Parallelism)) if finished < k6.Spec.Parallelism { - return ctrl.Result{}, nil - } - - log.Info("All runner pods are finished") - - // If this is a test run with cloud output, try to finalize it regardless. - if cli := types.ParseCLI(&k6.Spec); cli.HasCloudOut { - if err = cloud.FinishTestRun(k6.Status.TestRunID); err != nil { - log.Error(err, "Could not finalize the test run with cloud output") - } else { - log.Info(fmt.Sprintf("Cloud test run %s was finalized succesfully", k6.Status.TestRunID)) - } - } - - log.Info("Changing stage of K6 status to finished") - k6.Status.Stage = "finished" - if err = r.Client.Status().Update(ctx, k6); err != nil { - log.Error(err, "Could not update status of custom resource") - return ctrl.Result{}, err + return } - return ctrl.Result{}, nil + allFinished = true + return } diff --git a/controllers/k6_initialize.go b/controllers/k6_initialize.go index cda07836..cc69f005 100644 --- a/controllers/k6_initialize.go +++ b/controllers/k6_initialize.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/k6-operator/pkg/resources/jobs" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -25,14 +26,8 @@ import ( // InitializeJobs creates jobs that will run initial checks for distributed test if any are necessary func InitializeJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (res ctrl.Result, err error) { - log.Info("Initialize test") - - log.Info("Changing stage of K6 status to initialization") - k6.Status.Stage = "initialization" - if err = r.Client.Status().Update(ctx, k6); err != nil { - log.Error(err, "Could not update status of custom resource") - return - } + // initializer is a quick job so check in frequently + res = ctrl.Result{RequeueAfter: time.Second * 5} cli := types.ParseCLI(&k6.Spec) @@ -58,8 +53,117 @@ func InitializeJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6 } func RunValidations(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (res ctrl.Result, err error) { + // initializer is a quick job so check in frequently + res = ctrl.Result{RequeueAfter: time.Second * 5} + cli := types.ParseCLI(&k6.Spec) + inspectOutput, inspectReady, err := inspectTestRun(ctx, log, *k6, r) + if err != nil { + // inspectTestRun made a log message already so just return without requeue + return ctrl.Result{}, nil + } + if !inspectReady { + return res, nil + } + + log.Info(fmt.Sprintf("k6 inspect: %+v", inspectOutput)) + + if int32(inspectOutput.MaxVUs) < k6.Spec.Parallelism { + err = fmt.Errorf("number of instances > number of VUs") + // TODO maybe change this to a warning and simply set parallelism = maxVUs and proceed with execution? + // But logr doesn't seem to have warning level by default, only with V() method... + // It makes sense to return to this after / during logr VS logrus issue https://github.com/grafana/k6-operator/issues/84 + log.Error(err, "Parallelism argument cannot be larger than maximum VUs in the script", + "maxVUs", inspectOutput.MaxVUs, + "parallelism", k6.Spec.Parallelism) + + k6.Status.Stage = "error" + + if _, err := r.UpdateStatus(ctx, k6, log); err != nil { + return ctrl.Result{}, err + } + + // Don't requeue in case of this error; unless it's made into a warning as described above. + return ctrl.Result{}, nil + } + + if cli.HasCloudOut { + k6.UpdateCondition(v1alpha1.CloudTestRun, metav1.ConditionTrue) + k6.UpdateCondition(v1alpha1.CloudTestRunCreated, metav1.ConditionFalse) + k6.UpdateCondition(v1alpha1.CloudTestRunFinalized, metav1.ConditionFalse) + } else { + k6.UpdateCondition(v1alpha1.CloudTestRun, metav1.ConditionFalse) + } + + if _, err := r.UpdateStatus(ctx, k6, log); err != nil { + return ctrl.Result{}, err + } + + return res, nil +} + +func SetupCloudTest(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (res ctrl.Result, err error) { + res = ctrl.Result{RequeueAfter: time.Second * 5} + + inspectOutput, inspectReady, err := inspectTestRun(ctx, log, *k6, r) + if err != nil { + // This *shouldn't* fail since it was already done once. Don't requeue. + // Alternatively: store inspect options in K6 Status? Get rid off reading logs? + return ctrl.Result{}, nil + } + if !inspectReady { + return res, nil + } + + token, tokenReady, err := loadToken(ctx, log, r) + if err != nil { + // An error here means a very likely mis-configuration of the token. + // Consider updating status to error to let a user know quicker? + log.Error(err, "A problem while getting token.") + return ctrl.Result{}, nil + } + if !tokenReady { + return res, nil + } + + host := getEnvVar(k6.Spec.Runner.Env, "K6_CLOUD_HOST") + + if k6.IsFalse(v1alpha1.CloudTestRunCreated) { + + // If CloudTestRunCreated has just been updated, wait for a bit before + // acting, to avoid race condition between different reconcile loops. + t, _ := k6.LastUpdate(v1alpha1.CloudTestRunCreated) + if time.Now().Sub(t) < 5*time.Second { + return ctrl.Result{RequeueAfter: time.Second * 2}, nil + } + + if refID, err := cloud.CreateTestRun(inspectOutput, k6.Spec.Parallelism, host, token, log); err != nil { + log.Error(err, "Failed to create a new cloud test run.") + return res, nil + } else { + log = log.WithValues("testRunId", refID) + log.Info(fmt.Sprintf("Created cloud test run: %s", refID)) + + k6.Status.TestRunID = refID + k6.UpdateCondition(v1alpha1.CloudTestRunCreated, metav1.ConditionTrue) + + _, err := r.UpdateStatus(ctx, k6, log) + // log.Info(fmt.Sprintf("Debug updating status after create %v", updateHappened)) + if err != nil { + return ctrl.Result{}, err + } + } + } + + return ctrl.Result{}, nil +} + +// It may take some time to retrieve inspect output so indicate with boolean if it's ready +// and use returnErr only for errors that require a change of behaviour. All other errors +// should just be logged. +func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.K6, r *K6Reconciler) ( + inspectOutput cloud.InspectOutput, ready bool, returnErr error) { var ( listOpts = &client.ListOptions{ Namespace: k6.Namespace, @@ -70,20 +174,21 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6 }), } podList = &corev1.PodList{} + err error ) if err = r.List(ctx, podList, listOpts); err != nil { log.Error(err, "Could not list pods") - return ctrl.Result{}, err + return } if len(podList.Items) < 1 { log.Info("No initializing pod found yet") - return ctrl.Result{}, err + return } // there should be only 1 initializer pod if podList.Items[0].Status.Phase != "Succeeded" { log.Info("Waiting for initializing pod to finish") - return ctrl.Result{}, err + return } // Here we need to get the output of the pod @@ -91,18 +196,18 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6 // recommended to use REST client instead: // https://github.com/kubernetes-sigs/controller-runtime/issues/1229 + // TODO: if the below errors repeat several times, it'd be a real error case scenario. + // How likely is it? Should we track frequency of these errors here? config, err := rest.InClusterConfig() if err != nil { log.Error(err, "unable to fetch in-cluster REST config") - // don't return here - return ctrl.Result{}, err + return } clientset, err := kubernetes.NewForConfig(config) if err != nil { log.Error(err, "unable to get access to clientset") - // don't return here - return ctrl.Result{}, err + return } req := clientset.CoreV1().Pods(k6.Namespace).GetLogs(podList.Items[0].Name, &corev1.PodLogOptions{ Container: "k6", @@ -113,8 +218,7 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6 podLogs, err := req.Stream(ctx) if err != nil { log.Error(err, "unable to stream logs from the pod") - // don't return here - return ctrl.Result{}, err + return } defer podLogs.Close() @@ -122,98 +226,61 @@ func RunValidations(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6 _, err = io.Copy(buf, podLogs) if err != nil { log.Error(err, "unable to copy logs from the pod") - return ctrl.Result{}, err + return } - var ( - testRunId string - token string - inspectOutput cloud.InspectOutput - ) - - if err := json.Unmarshal(buf.Bytes(), &inspectOutput); err != nil { + if returnErr = json.Unmarshal(buf.Bytes(), &inspectOutput); returnErr != nil { // this shouldn't normally happen but if it does, let's log output by default - log.Error(err, fmt.Sprintf("unable to marshal: `%s`", buf.String())) - return ctrl.Result{}, err + log.Error(returnErr, fmt.Sprintf("unable to marshal: `%s`", buf.String())) } - log.Info(fmt.Sprintf("k6 inspect: %+v", inspectOutput)) - - if int32(inspectOutput.MaxVUs) < k6.Spec.Parallelism { - err = fmt.Errorf("number of instances > number of VUs") - // TODO maybe change this to a warning and simply set parallelism = maxVUs and proceed with execution? - // But logr doesn't seem to have warning level by default, only with V() method... - // It makes sense to return to this after / during logr VS logrus issue https://github.com/grafana/k6-operator/issues/84 - log.Error(err, "Parallelism argument cannot be larger than maximum VUs in the script", - "maxVUs", inspectOutput.MaxVUs, - "parallelism", k6.Spec.Parallelism) + ready = true + return +} - k6.Status.Stage = "error" - if err = r.Client.Status().Update(ctx, k6); err != nil { - log.Error(err, "Could not update status of custom resource") - return +// Similarly to inspectTestRun, there may be some errors during load of token +// that should be just waited out. But other errors should result in change of +// behaviour in the caller. +// ready shows whether token was loaded yet, while returnErr indicates an error +// that should be acted on. +func loadToken(ctx context.Context, log logr.Logger, r *K6Reconciler) (token string, ready bool, returnErr error) { + var ( + secrets corev1.SecretList + secretOpts = &client.ListOptions{ + // TODO: find out a better way to get namespace here + Namespace: "k6-operator-system", + LabelSelector: labels.SelectorFromSet(map[string]string{ + "k6cloud": "token", + }), } - return ctrl.Result{}, err - } + err error + ) - if err != nil { - log.Error(err, "Failed to initialize the script") + if err = r.List(ctx, &secrets, secretOpts); err != nil { + log.Error(err, "Failed to load k6 Cloud token") + // This may be a networking issue, etc. so just retry. return } - if cli.HasCloudOut { - var ( - secrets corev1.SecretList - secretOpts = &client.ListOptions{ - // TODO: find out a better way to get namespace here - Namespace: "k6-operator-system", - LabelSelector: labels.SelectorFromSet(map[string]string{ - "k6cloud": "token", - }), - } - ) - if err := r.List(ctx, &secrets, secretOpts); err != nil { - log.Error(err, "Failed to load k6 Cloud token") - return res, err - } - - if len(secrets.Items) < 1 { - err := fmt.Errorf("There are no secrets to hold k6 Cloud token") - log.Error(err, err.Error()) - return res, err - } - - if t, ok := secrets.Items[0].Data["token"]; !ok { - err := fmt.Errorf("The secret doesn't have a field token for k6 Cloud") - log.Error(err, err.Error()) - return res, err - } else { - token = string(t) - } - log.Info("Token for k6 Cloud was loaded.") - - host := getEnvVar(k6.Spec.Runner.Env, "K6_CLOUD_HOST") - - if refID, err := cloud.CreateTestRun(inspectOutput, k6.Spec.Parallelism, host, token, log); err != nil { - return res, err - } else { - testRunId = refID - log = log.WithValues("testRunId", k6.Status.TestRunID) - log.Info(fmt.Sprintf("Created cloud test run: %s", testRunId)) - } + if len(secrets.Items) < 1 { + // we should stop execution in case of this error + returnErr = fmt.Errorf("There are no secrets to hold k6 Cloud token") + log.Error(returnErr, err.Error()) + return } - log.Info("Changing stage of K6 status to initialized") - - k6.Status.Stage = "initialized" - k6.Status.TestRunID = testRunId - - if err = r.Client.Status().Update(ctx, k6); err != nil { - log.Error(err, "Could not update status of custom resource") + if t, ok := secrets.Items[0].Data["token"]; !ok { + // we should stop execution in case of this error + returnErr = fmt.Errorf("The secret doesn't have a field token for k6 Cloud") + log.Error(err, err.Error()) return + } else { + token = string(t) + ready = true + log.Info("Token for k6 Cloud was loaded.") } - return res, nil + return } func getEnvVar(vars []corev1.EnvVar, name string) string { diff --git a/controllers/k6_start.go b/controllers/k6_start.go index c963d6bc..07fba7f9 100644 --- a/controllers/k6_start.go +++ b/controllers/k6_start.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/k6-operator/api/v1alpha1" "github.com/grafana/k6-operator/pkg/resources/jobs" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -27,7 +28,10 @@ func isServiceReady(log logr.Logger, service *v1.Service) bool { } // StartJobs in the Ready phase using a curl container -func StartJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) { +func StartJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (res ctrl.Result, err error) { + // It may take some time to get Services up, so check in frequently + res = ctrl.Result{RequeueAfter: time.Second} + if len(k6.Status.TestRunID) > 0 { log = log.WithValues("testRunId", k6.Status.TestRunID) } @@ -42,9 +46,9 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Recon opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.Namespace} pl := &v1.PodList{} - if e := r.List(ctx, pl, opts); e != nil { - log.Error(e, "Could not list pods") - return ctrl.Result{}, e + if err = r.List(ctx, pl, opts); err != nil { + log.Error(err, "Could not list pods") + return res, nil } var count int @@ -58,16 +62,15 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Recon log.Info(fmt.Sprintf("%d/%d runner pods ready", count, k6.Spec.Parallelism)) if count != int(k6.Spec.Parallelism) { - return ctrl.Result{}, nil + return res, nil } var hostnames []string - var err error sl := &v1.ServiceList{} - if e := r.List(ctx, sl, opts); e != nil { - log.Error(e, "Could not list services") - return ctrl.Result{}, e + if err = r.List(ctx, sl, opts); err != nil { + log.Error(err, "Could not list services") + return res, nil } for _, service := range sl.Items { @@ -75,7 +78,7 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Recon if !isServiceReady(log, &service) { log.Info(fmt.Sprintf("%v service is not ready, aborting", service.ObjectMeta.Name)) - return ctrl.Result{RequeueAfter: time.Second}, nil + return res, nil } else { log.Info(fmt.Sprintf("%v service is ready", service.ObjectMeta.Name)) } @@ -87,24 +90,23 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Recon log.Error(err, "Failed to set controller reference for the start job") } + // TODO: add a check for existence of starter job + if err = r.Create(ctx, starter); err != nil { log.Error(err, "Failed to launch k6 test starter") - return ctrl.Result{}, err + return res, nil } - log.Info("Created started job") - - if err != nil { - log.Error(err, "Failed to start all jobs") - return ctrl.Result{}, err - } + log.Info("Created starter job") log.Info("Changing stage of K6 status to started") k6.Status.Stage = "started" - if err = r.Client.Status().Update(ctx, k6); err != nil { - log.Error(err, "Could not update status of custom resource") + k6.UpdateCondition(v1alpha1.TestRunRunning, metav1.ConditionTrue) + + if updateHappened, err := r.UpdateStatus(ctx, k6, log); err != nil { return ctrl.Result{}, err + } else if updateHappened { + return ctrl.Result{Requeue: true}, nil } - return ctrl.Result{}, nil }