Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor initializer as SubReconciler #399

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 237 additions & 36 deletions config/crd/bases/k6.io_k6s.yaml

Large diffs are not rendered by default.

273 changes: 237 additions & 36 deletions config/crd/bases/k6.io_testruns.yaml

Large diffs are not rendered by default.

65 changes: 37 additions & 28 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,39 @@ const (
errMessageTooLong = "Creation of %s takes too long: your configuration might be off. Check if %v were created successfully."
)

// injection for unit tests
var (
getRestClientF = getRestClient
podLogsF = podLogs
)

func getRestClient() (kubernetes.Interface, error) {
// TODO: if the below errors repeat several times, it'd be a real error case scenario.
// How likely is it? Should we track frequency of these errors here?
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}

return kubernetes.NewForConfig(config)
}

func podLogs(ctx context.Context, ns, name string) (io.ReadCloser, error) {
// Here we need to get the output of the pod
// pods/log is not currently supported by controller-runtime client and it is officially
// recommended to use REST client instead:
// https://github.com/kubernetes-sigs/controller-runtime/issues/1229
clientset, err := getRestClientF()
if err != nil {
return nil, err
}
req := clientset.CoreV1().Pods(ns).GetLogs(name, &corev1.PodLogOptions{
Container: "k6",
})

return req.Stream(ctx)
}

// It may take some time to retrieve inspect output so indicate with boolean if it's ready
// and use returnErr only for errors that require a change of behaviour. All other errors
// should just be logged.
Expand Down Expand Up @@ -59,42 +92,18 @@ func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI,
return
}

// Here we need to get the output of the pod
// pods/log is not currently supported by controller-runtime client and it is officially
// recommended to use REST client instead:
// https://github.com/kubernetes-sigs/controller-runtime/issues/1229

// TODO: if the below errors repeat several times, it'd be a real error case scenario.
// How likely is it? Should we track frequency of these errors here?
config, err := rest.InClusterConfig()
if err != nil {
log.Error(err, "unable to fetch in-cluster REST config")
returnErr = err
return
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error(err, "unable to get access to clientset")
returnErr = err
return
}
req := clientset.CoreV1().Pods(k6.NamespacedName().Namespace).GetLogs(podList.Items[0].Name, &corev1.PodLogOptions{
Container: "k6",
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
ctx, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()

podLogs, err := req.Stream(ctx)
logs, err := podLogsF(ctx, k6.NamespacedName().Namespace, podList.Items[0].Name)
if err != nil {
log.Error(err, "unable to stream logs from the pod")
returnErr = err
return
}
defer podLogs.Close()
defer logs.Close()

buf := new(bytes.Buffer)
_, returnErr = io.Copy(buf, podLogs)
_, returnErr = io.Copy(buf, logs)
if err != nil {
log.Error(err, "unable to copy logs from the pod")
return
Expand Down
50 changes: 50 additions & 0 deletions controllers/fake_rest_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package controllers

import (
"context"
"io"
"strings"
"time"

k6types "go.k6.io/k6/lib/types"

"github.com/grafana/k6-operator/pkg/cloud"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
clientgotesting "k8s.io/client-go/testing"
)

type contextKey string // for SA1029 staticcheck

var (
mockContextKey = contextKey("test-pod-log")

// to check output from fakePodLogs(), see its impl. below
mockInspectOutput = cloud.InspectOutput{
MaxVUs: 2,
TotalDuration: k6types.NullDurationFrom(time.Second * 5),
}
)

func fakeGetRestClient() (kubernetes.Interface, error) {
cset := fake.NewSimpleClientset()
f := clientgotesting.ReactionFunc(func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
// nothing to inject ATM, as the runtime.Object is ignored here
// TODO
return
})
cset.AddReactor("get", "pods/log", f)
return cset, nil
}

func fakePodLogs(ctx context.Context, ns, name string) (io.ReadCloser, error) {
s := `{"totalDuration": "5s","maxVUs": 2, "thresholds": null}`
ctxV, ok := ctx.Value(mockContextKey).(string)
if ok && len(ctxV) > 0 {
s = ctxV
}
reader := strings.NewReader(s)
return io.NopCloser(reader), nil
}
103 changes: 103 additions & 0 deletions controllers/initializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package controllers

import (
"context"
"fmt"
"github.com/go-logr/logr"
"time"

"go.k6.io/k6/cloudapi"

"github.com/grafana/k6-operator/api/v1alpha1"
"github.com/grafana/k6-operator/pkg/cloud"
"github.com/grafana/k6-operator/pkg/types"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reconciler.io/runtime/reconcilers"
)

func InitializerReconciler(c reconcilers.Config) reconcilers.SubReconciler[v1alpha1.TestRunI] {
return &reconcilers.SyncReconciler[v1alpha1.TestRunI]{
Name: "InitializerJob",
SyncDuringFinalization: false,
SyncWithResult: func(ctx context.Context, tr v1alpha1.TestRunI) (res reconcilers.Result, err error) {
// initializer is a quick job so check in frequently
res = reconcilers.Result{RequeueAfter: time.Second * 5}

log := logr.FromContextOrDiscard(ctx)
gck6, ok := reconcilers.RetrieveValue(ctx, gck6ClientStashKey).(*cloudapi.Client)
if !ok {
return res, fmt.Errorf("expected stashed value for key %q", gck6ClientStashKey)
}

inspectOutput, inspectReady, err := inspectTestRun(ctx, log, tr, c.Client)

if err != nil {
// TODO: move to separate events handling
// input: tr, gck6 client, code of event, log. As method of gck6 client?

// Cloud output test run is not created yet at this point, so sending
// events is possible only for PLZ test run.
if v1alpha1.IsTrue(tr, v1alpha1.CloudPLZTestRun) {
// This error won't allow to start a test so let k6 Cloud know of it
events := cloud.ErrorEvent(cloud.K6OperatorStartError).
WithDetail(fmt.Sprintf("Failed to inspect the test script: %v", err)).
WithAbort()
cloud.SendTestRunEvents(gck6, v1alpha1.TestRunID(tr), log, events)
}

// inspectTestRun made a log message already so just return error without requeue
return reconcilers.Result{Requeue: false}, err
}
if !inspectReady {
return res, nil
}
log.Info(fmt.Sprintf("k6 inspect: %+v", inspectOutput))
reconcilers.StashValue(ctx, inspectStashKey, inspectOutput)

if int32(inspectOutput.MaxVUs) < tr.GetSpec().Parallelism {
err = fmt.Errorf("number of instances > number of VUs")
// TODO: surface this error as an event
log.Error(err, "Parallelism argument cannot be larger than maximum VUs in the script",
"maxVUs", inspectOutput.MaxVUs,
"parallelism", tr.GetSpec().Parallelism)

tr.GetStatus().Stage = "error"

// if _, err := r.UpdateStatus(ctx, k6, log); err != nil {
// return ctrl.Result{}, ready, err
// }

// Don't requeue in case of this error; unless it's made into a warning as described above.
return reconcilers.Result{Requeue: false}, err
}

cli := types.ParseCLI(tr.GetSpec().Arguments)
if cli.HasCloudOut {
v1alpha1.UpdateCondition(tr, v1alpha1.CloudTestRun, metav1.ConditionTrue)

if v1alpha1.IsUnknown(tr, v1alpha1.CloudTestRunCreated) {
// In case of PLZ test run, this is already set to true
v1alpha1.UpdateCondition(tr, v1alpha1.CloudTestRunCreated, metav1.ConditionFalse)
}

v1alpha1.UpdateCondition(tr, v1alpha1.CloudTestRunFinalized, metav1.ConditionFalse)
} else {
v1alpha1.UpdateCondition(tr, v1alpha1.CloudTestRun, metav1.ConditionFalse)
}

// if _, err := r.UpdateStatus(ctx, k6, log); err != nil {
// return ctrl.Result{}, ready, err
// }

// targetImage, err := resolveTargetImage(ctx, c.Client, resource)
// if err != nil {
// return err
// }
// resource.Status.MarkImageResolved()
// resource.Status.TargetImage = targetImage

return reconcilers.Result{}, nil
},
}
}
Loading
Loading