Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework Initializer container and handling its verdict #450

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 1 addition & 54 deletions controllers/common.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
package controllers

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"time"

"github.com/go-logr/logr"
"github.com/grafana/k6-operator/api/v1alpha1"
"github.com/grafana/k6-operator/pkg/cloud"
"github.com/grafana/k6-operator/pkg/testrun"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -65,52 +58,6 @@ func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI,
return
}

// Here we need to get the output of the pod
// pods/log is not currently supported by controller-runtime client and it is officially
// recommended to use REST client instead:
// https://github.com/kubernetes-sigs/controller-runtime/issues/1229

// TODO: if the below errors repeat several times, it'd be a real error case scenario.
// How likely is it? Should we track frequency of these errors here?
config, err := rest.InClusterConfig()
if err != nil {
log.Error(err, "unable to fetch in-cluster REST config")
returnErr = err
return
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error(err, "unable to get access to clientset")
returnErr = err
return
}
req := clientset.CoreV1().Pods(k6.NamespacedName().Namespace).GetLogs(podList.Items[0].Name, &corev1.PodLogOptions{
Container: "k6",
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()

podLogs, err := req.Stream(ctx)
if err != nil {
log.Error(err, "unable to stream logs from the pod")
returnErr = err
return
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, returnErr = io.Copy(buf, podLogs)
if err != nil {
log.Error(err, "unable to copy logs from the pod")
return
}

if returnErr = json.Unmarshal(buf.Bytes(), &inspectOutput); returnErr != nil {
// this shouldn't normally happen but if it does, let's log output by default
log.Error(returnErr, fmt.Sprintf("unable to marshal: `%s`", buf.String()))
}

ready = true
return
}
Expand Down Expand Up @@ -201,7 +148,7 @@ func (r *TestRunReconciler) hostnames(ctx context.Context, log logr.Logger, abor
err error
)

sl := &v1.ServiceList{}
sl := &corev1.ServiceList{}

if err = r.List(ctx, sl, opts); err != nil {
log.Error(err, "Could not list services")
Expand Down
55 changes: 27 additions & 28 deletions pkg/resources/jobs/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,29 @@ func NewInitializerJob(k6 v1alpha1.TestRunI, argLine string) (*batchv1.Job, erro
)
istioCommand, istioEnabled := newIstioCommand(k6.GetSpec().Scuttle.Enabled, []string{"sh", "-c"})
command := append(istioCommand, fmt.Sprintf(
// There can be several scenarios from k6 command here:
// a) script is correct and `k6 inspect` outputs JSON
// b) script is partially incorrect and `k6` outputs a warning log message and
// then a JSON
// c) script is incorrect and `k6` outputs an error log message
// Warnings at this point are not necessary (warning messages will re-appear in
// runner's logs and the user can see them there) so we need a pure JSON here
// without any additional messages in cases a) and b). In case c), output should
// contain error message and the Job is to exit with non-zero code.
//
// Due to some pecularities of k6 logging, to achieve the above behaviour,
// we need to use a workaround to store all log messages in temp file while
// printing JSON as usual. Then parse temp file only for errors, ignoring
// any other log messages.
// Related: https://github.com/grafana/k6-docs/issues/877
"mkdir -p $(dirname %s) && k6 archive %s -O %s %s 2> /tmp/k6logs && k6 inspect --execution-requirements %s 2> /tmp/k6logs ; ! cat /tmp/k6logs | grep 'level=error'",
archiveName, scriptName, archiveName, argLine,
archiveName))
"k6 archive %s -O %s %s && k6 inspect --execution-requirements %s",
scriptName, archiveName, argLine, archiveName))

env := append(newIstioEnvVar(k6.GetSpec().Scuttle, istioEnabled), k6.GetSpec().Initializer.Env...)

volumes := script.Volume()
// emptyDir to hold our temporary data
tmpVolume := corev1.Volume{
Name: "tmpdir",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
}
volumes = append(volumes, tmpVolume)
volumes = append(volumes, k6.GetSpec().Initializer.Volumes...)

volumeMounts := script.VolumeMount()
// make /tmp an EmptyDir
tmpVolumeMount := corev1.VolumeMount{
Name: "tmpdir",
MountPath: "/tmp",
}
volumeMounts = append(volumeMounts, tmpVolumeMount)
volumeMounts = append(volumeMounts, k6.GetSpec().Initializer.VolumeMounts...)

var zero32 int32
Expand Down Expand Up @@ -116,16 +114,17 @@ func NewInitializerJob(k6 v1alpha1.TestRunI, argLine string) (*batchv1.Job, erro
InitContainers: getInitContainers(k6.GetSpec().Initializer, script),
Containers: []corev1.Container{
{
Image: image,
ImagePullPolicy: k6.GetSpec().Initializer.ImagePullPolicy,
Name: "k6",
Command: command,
Env: env,
Resources: k6.GetSpec().Initializer.Resources,
VolumeMounts: volumeMounts,
EnvFrom: k6.GetSpec().Initializer.EnvFrom,
Ports: ports,
SecurityContext: &k6.GetSpec().Initializer.ContainerSecurityContext,
Image: image,
ImagePullPolicy: k6.GetSpec().Initializer.ImagePullPolicy,
Name: "k6",
Command: command,
Env: env,
Resources: k6.GetSpec().Initializer.Resources,
VolumeMounts: volumeMounts,
EnvFrom: k6.GetSpec().Initializer.EnvFrom,
Ports: ports,
SecurityContext: &k6.GetSpec().Initializer.ContainerSecurityContext,
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
},
},
Volumes: volumes,
Expand Down
31 changes: 25 additions & 6 deletions pkg/resources/jobs/initializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ func TestNewInitializerJob(t *testing.T) {
automountServiceAccountToken := true
zero := int32(0)

volumes := script.Volume()
// emptyDir to hold our temporary data
tmpVolume := corev1.Volume{
Name: "tmpdir",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
}
volumes = append(volumes, tmpVolume)

volumeMounts := script.VolumeMount()
// make /tmp an EmptyDir
tmpVolumeMount := corev1.VolumeMount{
Name: "tmpdir",
MountPath: "/tmp",
}
volumeMounts = append(volumeMounts, tmpVolumeMount)

expectedOutcome := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-initializer",
Expand Down Expand Up @@ -63,7 +81,7 @@ func TestNewInitializerJob(t *testing.T) {
Name: "k6",
Command: []string{
"sh", "-c",
"mkdir -p $(dirname /tmp/test.js.archived.tar) && k6 archive /test/test.js -O /tmp/test.js.archived.tar --out cloud 2> /tmp/k6logs && k6 inspect --execution-requirements /tmp/test.js.archived.tar 2> /tmp/k6logs ; ! cat /tmp/k6logs | grep 'level=error'",
"k6 archive /test/test.js -O /tmp/test.js.archived.tar --out cloud && k6 inspect --execution-requirements /tmp/test.js.archived.tar",
},
Env: []corev1.EnvVar{},
EnvFrom: []corev1.EnvFromSource{
Expand All @@ -75,13 +93,14 @@ func TestNewInitializerJob(t *testing.T) {
},
},
},
Resources: corev1.ResourceRequirements{},
VolumeMounts: script.VolumeMount(),
Ports: []corev1.ContainerPort{{ContainerPort: 6565}},
SecurityContext: &corev1.SecurityContext{},
Resources: corev1.ResourceRequirements{},
VolumeMounts: volumeMounts,
Ports: []corev1.ContainerPort{{ContainerPort: 6565}},
SecurityContext: &corev1.SecurityContext{},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
},
},
Volumes: script.Volume(),
Volumes: volumes,
},
},
},
Expand Down
Loading