diff --git a/controllers/common.go b/controllers/common.go index 078d96b5..55278fb2 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -1,24 +1,17 @@ package controllers import ( - "bytes" "context" - "encoding/json" "errors" "fmt" - "io" - "time" "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" "github.com/grafana/k6-operator/pkg/cloud" "github.com/grafana/k6-operator/pkg/testrun" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -65,52 +58,6 @@ func inspectTestRun(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, return } - // Here we need to get the output of the pod - // pods/log is not currently supported by controller-runtime client and it is officially - // recommended to use REST client instead: - // https://github.com/kubernetes-sigs/controller-runtime/issues/1229 - - // TODO: if the below errors repeat several times, it'd be a real error case scenario. - // How likely is it? Should we track frequency of these errors here? - config, err := rest.InClusterConfig() - if err != nil { - log.Error(err, "unable to fetch in-cluster REST config") - returnErr = err - return - } - - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - log.Error(err, "unable to get access to clientset") - returnErr = err - return - } - req := clientset.CoreV1().Pods(k6.NamespacedName().Namespace).GetLogs(podList.Items[0].Name, &corev1.PodLogOptions{ - Container: "k6", - }) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) - defer cancel() - - podLogs, err := req.Stream(ctx) - if err != nil { - log.Error(err, "unable to stream logs from the pod") - returnErr = err - return - } - defer podLogs.Close() - - buf := new(bytes.Buffer) - _, returnErr = io.Copy(buf, podLogs) - if err != nil { - log.Error(err, "unable to copy logs from the pod") - return - } - - if returnErr = json.Unmarshal(buf.Bytes(), &inspectOutput); returnErr != nil { - // this shouldn't normally happen but if it does, let's log output by default - log.Error(returnErr, fmt.Sprintf("unable to marshal: `%s`", buf.String())) - } - ready = true return } @@ -201,7 +148,7 @@ func (r *TestRunReconciler) hostnames(ctx context.Context, log logr.Logger, abor err error ) - sl := &v1.ServiceList{} + sl := &corev1.ServiceList{} if err = r.List(ctx, sl, opts); err != nil { log.Error(err, "Could not list services") diff --git a/pkg/resources/jobs/initializer.go b/pkg/resources/jobs/initializer.go index 64bb2de4..9db0da01 100644 --- a/pkg/resources/jobs/initializer.go +++ b/pkg/resources/jobs/initializer.go @@ -61,31 +61,29 @@ func NewInitializerJob(k6 v1alpha1.TestRunI, argLine string) (*batchv1.Job, erro ) istioCommand, istioEnabled := newIstioCommand(k6.GetSpec().Scuttle.Enabled, []string{"sh", "-c"}) command := append(istioCommand, fmt.Sprintf( - // There can be several scenarios from k6 command here: - // a) script is correct and `k6 inspect` outputs JSON - // b) script is partially incorrect and `k6` outputs a warning log message and - // then a JSON - // c) script is incorrect and `k6` outputs an error log message - // Warnings at this point are not necessary (warning messages will re-appear in - // runner's logs and the user can see them there) so we need a pure JSON here - // without any additional messages in cases a) and b). In case c), output should - // contain error message and the Job is to exit with non-zero code. - // - // Due to some pecularities of k6 logging, to achieve the above behaviour, - // we need to use a workaround to store all log messages in temp file while - // printing JSON as usual. Then parse temp file only for errors, ignoring - // any other log messages. - // Related: https://github.com/grafana/k6-docs/issues/877 - "mkdir -p $(dirname %s) && k6 archive %s -O %s %s 2> /tmp/k6logs && k6 inspect --execution-requirements %s 2> /tmp/k6logs ; ! cat /tmp/k6logs | grep 'level=error'", - archiveName, scriptName, archiveName, argLine, - archiveName)) + "k6 archive %s -O %s %s && k6 inspect --execution-requirements %s", + scriptName, archiveName, argLine, archiveName)) env := append(newIstioEnvVar(k6.GetSpec().Scuttle, istioEnabled), k6.GetSpec().Initializer.Env...) volumes := script.Volume() + // emptyDir to hold our temporary data + tmpVolume := corev1.Volume{ + Name: "tmpdir", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + } + volumes = append(volumes, tmpVolume) volumes = append(volumes, k6.GetSpec().Initializer.Volumes...) volumeMounts := script.VolumeMount() + // make /tmp an EmptyDir + tmpVolumeMount := corev1.VolumeMount{ + Name: "tmpdir", + MountPath: "/tmp", + } + volumeMounts = append(volumeMounts, tmpVolumeMount) volumeMounts = append(volumeMounts, k6.GetSpec().Initializer.VolumeMounts...) var zero32 int32 @@ -116,16 +114,17 @@ func NewInitializerJob(k6 v1alpha1.TestRunI, argLine string) (*batchv1.Job, erro InitContainers: getInitContainers(k6.GetSpec().Initializer, script), Containers: []corev1.Container{ { - Image: image, - ImagePullPolicy: k6.GetSpec().Initializer.ImagePullPolicy, - Name: "k6", - Command: command, - Env: env, - Resources: k6.GetSpec().Initializer.Resources, - VolumeMounts: volumeMounts, - EnvFrom: k6.GetSpec().Initializer.EnvFrom, - Ports: ports, - SecurityContext: &k6.GetSpec().Initializer.ContainerSecurityContext, + Image: image, + ImagePullPolicy: k6.GetSpec().Initializer.ImagePullPolicy, + Name: "k6", + Command: command, + Env: env, + Resources: k6.GetSpec().Initializer.Resources, + VolumeMounts: volumeMounts, + EnvFrom: k6.GetSpec().Initializer.EnvFrom, + Ports: ports, + SecurityContext: &k6.GetSpec().Initializer.ContainerSecurityContext, + TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, }, }, Volumes: volumes, diff --git a/pkg/resources/jobs/initializer_test.go b/pkg/resources/jobs/initializer_test.go index 868d13ef..10d7ac58 100644 --- a/pkg/resources/jobs/initializer_test.go +++ b/pkg/resources/jobs/initializer_test.go @@ -21,6 +21,24 @@ func TestNewInitializerJob(t *testing.T) { automountServiceAccountToken := true zero := int32(0) + volumes := script.Volume() + // emptyDir to hold our temporary data + tmpVolume := corev1.Volume{ + Name: "tmpdir", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + } + volumes = append(volumes, tmpVolume) + + volumeMounts := script.VolumeMount() + // make /tmp an EmptyDir + tmpVolumeMount := corev1.VolumeMount{ + Name: "tmpdir", + MountPath: "/tmp", + } + volumeMounts = append(volumeMounts, tmpVolumeMount) + expectedOutcome := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "test-initializer", @@ -63,7 +81,7 @@ func TestNewInitializerJob(t *testing.T) { Name: "k6", Command: []string{ "sh", "-c", - "mkdir -p $(dirname /tmp/test.js.archived.tar) && k6 archive /test/test.js -O /tmp/test.js.archived.tar --out cloud 2> /tmp/k6logs && k6 inspect --execution-requirements /tmp/test.js.archived.tar 2> /tmp/k6logs ; ! cat /tmp/k6logs | grep 'level=error'", + "k6 archive /test/test.js -O /tmp/test.js.archived.tar --out cloud && k6 inspect --execution-requirements /tmp/test.js.archived.tar", }, Env: []corev1.EnvVar{}, EnvFrom: []corev1.EnvFromSource{ @@ -75,13 +93,14 @@ func TestNewInitializerJob(t *testing.T) { }, }, }, - Resources: corev1.ResourceRequirements{}, - VolumeMounts: script.VolumeMount(), - Ports: []corev1.ContainerPort{{ContainerPort: 6565}}, - SecurityContext: &corev1.SecurityContext{}, + Resources: corev1.ResourceRequirements{}, + VolumeMounts: volumeMounts, + Ports: []corev1.ContainerPort{{ContainerPort: 6565}}, + SecurityContext: &corev1.SecurityContext{}, + TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, }, }, - Volumes: script.Volume(), + Volumes: volumes, }, }, },