Skip to content

Commit

Permalink
added improvement how we are fetching results (#453)
Browse files Browse the repository at this point in the history
Signed-off-by: Jasmin Gacic <jasmin.gacic@gmail.com>
  • Loading branch information
jasmingacic authored Oct 28, 2021
1 parent a810688 commit a2d55d5
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 37 deletions.
101 changes: 64 additions & 37 deletions pkg/jobs/jobclient.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jobs

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -93,28 +94,45 @@ func (c *JobClient) LaunchK8sJob(image string, repo result.Repository, execution
ErrorMessage: err.Error(),
}, err
}

execResult := testkube.ExecutionResult{}

for _, pod := range pods.Items {
if pod.Status.Phase != v1.PodRunning {
if pod.Labels["job-name"] == execution.Id {
if err := wait.PollImmediate(time.Second, time.Duration(0)*time.Second, k8sclient.IsPodRunning(c.ClientSet, pod.Name, c.Namespace)); err != nil {
return testkube.ExecutionResult{
fmt.Println("HERE 102")
go func() {
if pod.Labels["job-name"] == execution.Id {
if err := wait.PollImmediate(time.Second, time.Duration(0)*time.Second, k8sclient.HasPodSucceeded(c.ClientSet, pod.Name, c.Namespace)); err != nil {
execResult = testkube.ExecutionResult{
Status: testkube.StatusPtr(testkube.ERROR__ExecutionStatus),
ErrorMessage: err.Error(),
}

repo.UpdateResult(context.TODO(), execution.Id, execResult)
return
}
}
result, err = c.GetPodLogs(pod.Name, execution, repo, false)
if err != nil {
execResult = testkube.ExecutionResult{
Status: testkube.StatusPtr(testkube.ERROR__ExecutionStatus),
ErrorMessage: err.Error(),
}, err
}
repo.UpdateResult(context.TODO(), execution.Id, execResult)
return
}
}
result, err = c.GetPodLogs(pod.Name, execution, repo)
if err != nil {
return testkube.ExecutionResult{
Status: testkube.StatusPtr(testkube.ERROR__ExecutionStatus),
ErrorMessage: err.Error(),
}, err
}
execResult = testkube.ExecutionResult{
Status: testkube.StatusPtr(testkube.SUCCESS_ExecutionStatus),
Output: result,
}
repo.UpdateResult(context.TODO(), execution.Id, execResult)
}()
}
}
fmt.Println("HERE 132")

return testkube.ExecutionResult{
Status: testkube.StatusPtr(testkube.SUCCESS_ExecutionStatus),
Status: testkube.StatusPtr(testkube.PENDING_ExecutionStatus),
Output: result,
}, nil
}
Expand All @@ -134,12 +152,12 @@ func (c *JobClient) GetJobPods(podsClient pods.PodInterface, jobName string, ret
return pods, nil
}

func (c *JobClient) GetPodLogs(podName string, execution testkube.Execution, repo result.Repository) (string, error) {
func (c *JobClient) GetPodLogs(podName string, execution testkube.Execution, repo result.Repository, tail bool) (string, error) {
count := int64(100)
var toReturn string
var message string
podLogOptions := v1.PodLogOptions{
Follow: true,
Follow: tail,
TailLines: &count,
}

Expand All @@ -153,32 +171,41 @@ func (c *JobClient) GetPodLogs(podName string, execution testkube.Execution, rep

defer stream.Close()

for {
buf := make([]byte, 2000)
numBytes, err := stream.Read(buf)
if numBytes == 0 {
break
}
if err == io.EOF {
break
}
if err != nil {
return "", err
}

message = string(buf[:numBytes])
if strings.Contains(message, fmt.Sprintf("$$$%s$$$", execution.Id)) {
message = ""
break
} else {
toReturn += message
execution.ExecutionResult.Output = toReturn
err := repo.UpdateResult(context.Background(), execution.Id, *execution.ExecutionResult)
if tail {
for {
buf := make([]byte, 2000)
numBytes, err := stream.Read(buf)
if numBytes == 0 {
break
}
if err == io.EOF {
break
}
if err != nil {
fmt.Println(err)
return "", err
}

message = string(buf[:numBytes])
if strings.Contains(message, fmt.Sprintf("$$$%s$$$", execution.Id)) {
message = ""
break
} else {
toReturn += message
execution.ExecutionResult.Output = toReturn
err := repo.UpdateResult(context.Background(), execution.Id, *execution.ExecutionResult)
if err != nil {
fmt.Println(err)
break
}
}
}
} else {
buf := new(bytes.Buffer)
_, err = io.Copy(buf, stream)
if err != nil {
return "", err
}
toReturn = buf.String()
}
return toReturn, nil
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/k8sclient/k8sclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,23 @@ func IsPodRunning(c *kubernetes.Clientset, podName, namespace string) wait.Condi
}
}

func HasPodSucceeded(c *kubernetes.Clientset, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}

switch pod.Status.Phase {
case corev1.PodSucceeded:
return true, nil
case corev1.PodFailed:
return false, nil
}
return false, nil
}
}

// IsPodReady check if the pod in question is running state
func IsPodReady(c *kubernetes.Clientset, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
Expand Down

0 comments on commit a2d55d5

Please sign in to comment.