Skip to content

Commit

Permalink
Add support for multiple containers in created pods (#1887)
Browse files Browse the repository at this point in the history
* Add support for multiple containers in created pods

* fix package name

* fix for updated branch

* fix for updated branch

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
AKhoria and mergify[bot] committed Mar 1, 2023
1 parent afe5fb3 commit ea9d1a2
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 121 deletions.
2 changes: 1 addition & 1 deletion pkg/function/kube_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func kubeTaskPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1
ctx = field.Context(ctx, consts.PodNameKey, pod.Name)
ctx = field.Context(ctx, consts.LogKindKey, consts.LogKindDatapath)
// Fetch logs from the pod
r, err := kube.StreamPodLogs(ctx, cli, pod.Namespace, pod.Name)
r, err := kube.StreamPodLogs(ctx, cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/function/prepare_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func prepareDataPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
// Fetch logs from the pod
logs, err := kube.GetPodLogs(ctx, cli, pod.Namespace, pod.Name)
logs, err := kube.GetPodLogs(ctx, cli, pod.Namespace, pod.Name, pod.Spec.Containers[0].Name)
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
Expand Down
29 changes: 20 additions & 9 deletions pkg/kube/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"os"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -140,6 +141,11 @@ func GetPodObjectFromPodOptions(cli kubernetes.Interface, opts *PodOptions) (*v1
return nil, errors.Wrapf(err, "Failed to create pod. Failed to override pod specs. Namespace: %s, NameFmt: %s", opts.Namespace, opts.GenerateName)
}

// Always put the main container the first
sort.Slice(patchedSpecs.Containers, func(i, j int) bool {
return patchedSpecs.Containers[i].Name == defaultContainerName
})

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: opts.GenerateName,
Expand Down Expand Up @@ -215,16 +221,17 @@ func DeletePod(ctx context.Context, cli kubernetes.Interface, pod *v1.Pod) error
return nil
}

func StreamPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name string) (io.ReadCloser, error) {
func StreamPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string) (io.ReadCloser, error) {
plo := &v1.PodLogOptions{
Follow: true,
Follow: true,
Container: containerName,
}
return cli.CoreV1().Pods(namespace).GetLogs(name, plo).Stream(ctx)
return cli.CoreV1().Pods(namespace).GetLogs(podName, plo).Stream(ctx)
}

// GetPodLogs fetches the logs from the given pod
func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name string) (string, error) {
reader, err := cli.CoreV1().Pods(namespace).GetLogs(name, &v1.PodLogOptions{}).Stream(ctx)
func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string) (string, error) {
reader, err := cli.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{Container: containerName}).Stream(ctx)
if err != nil {
return "", err
}
Expand All @@ -237,8 +244,8 @@ func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name s
}

// getErrorFromLogs fetches logs from pod and constructs error containing last ten lines of log and specified error message
func getErrorFromLogs(ctx context.Context, cli kubernetes.Interface, namespace, podName string, err error, errorMessage string) error {
r, logErr := StreamPodLogs(ctx, cli, namespace, podName)
func getErrorFromLogs(ctx context.Context, cli kubernetes.Interface, namespace, podName, containerName string, err error, errorMessage string) error {
r, logErr := StreamPodLogs(ctx, cli, namespace, podName, containerName)
if logErr != nil {
return errors.Wrapf(logErr, "Failed to fetch logs from the pod")
}
Expand All @@ -257,12 +264,14 @@ func WaitForPodReady(ctx context.Context, cli kubernetes.Interface, namespace, n
timeoutCtx, waitCancel := context.WithTimeout(ctx, GetPodReadyWaitTimeout())
defer waitCancel()
attachLog := true
containerForLogs := ""
err := poll.Wait(timeoutCtx, func(ctx context.Context) (bool, error) {
p, err := cli.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
attachLog = false
return false, err
}
containerForLogs = p.Spec.Containers[0].Name

// check if nodes are up and available
err = checkNodesStatus(p, cli)
Expand Down Expand Up @@ -294,7 +303,7 @@ func WaitForPodReady(ctx context.Context, cli kubernetes.Interface, namespace, n

errorMessage := fmt.Sprintf("Pod did not transition into running state. Timeout:%v Namespace:%s, Name:%s", GetPodReadyWaitTimeout(), namespace, name)
if attachLog {
return getErrorFromLogs(ctx, cli, namespace, name, err, errorMessage)
return getErrorFromLogs(ctx, cli, namespace, name, containerForLogs, err, errorMessage)
}

return errors.Wrap(err, errorMessage)
Expand Down Expand Up @@ -376,12 +385,14 @@ func checkPVCAndPVStatus(ctx context.Context, vol v1.Volume, p *v1.Pod, cli kube
// WaitForPodCompletion waits for a pod to reach a terminal state, or timeout
func WaitForPodCompletion(ctx context.Context, cli kubernetes.Interface, namespace, name string) error {
attachLog := true
containerForLogs := ""
err := poll.Wait(ctx, func(ctx context.Context) (bool, error) {
p, err := cli.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
attachLog = false
return true, err
}
containerForLogs = p.Spec.Containers[0].Name
switch p.Status.Phase {
case v1.PodFailed:
return false, errors.Errorf("Pod %s failed. Pod status: %s", name, p.Status.String())
Expand All @@ -391,7 +402,7 @@ func WaitForPodCompletion(ctx context.Context, cli kubernetes.Interface, namespa

errorMessage := "Pod failed or did not transition into complete state"
if attachLog {
return getErrorFromLogs(ctx, cli, namespace, name, err, errorMessage)
return getErrorFromLogs(ctx, cli, namespace, name, containerForLogs, err, errorMessage)
}
return errors.Wrap(err, errorMessage)
}
Expand Down
Loading

0 comments on commit ea9d1a2

Please sign in to comment.