diff --git a/src/internal/packager/helm/zarf.go b/src/internal/packager/helm/zarf.go index 94996b4173..8ff110ce0e 100644 --- a/src/internal/packager/helm/zarf.go +++ b/src/internal/packager/helm/zarf.go @@ -8,9 +8,10 @@ import ( "context" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/defenseunicorns/zarf/src/internal/packager/template" "github.com/defenseunicorns/zarf/src/pkg/cluster" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/transform" "github.com/defenseunicorns/zarf/src/pkg/utils" @@ -61,18 +62,21 @@ func (h *Helm) UpdateZarfAgentValues(ctx context.Context) error { } // Get the current agent image from one of its pods. - pods := h.cluster.WaitForPodsAndContainers( - ctx, - k8s.PodLookup{ - Namespace: cluster.ZarfNamespaceName, - Selector: "app=agent-hook", - }, - nil, - ) + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{"app": "agent-hook"}}) + if err != nil { + return err + } + listOpts := metav1.ListOptions{ + LabelSelector: selector.String(), + } + podList, err := h.cluster.Clientset.CoreV1().Pods(cluster.ZarfNamespaceName).List(ctx, listOpts) + if err != nil { + return err + } var currentAgentImage transform.Image - if len(pods) > 0 && len(pods[0].Spec.Containers) > 0 { - currentAgentImage, err = transform.ParseImageRef(pods[0].Spec.Containers[0].Image) + if len(podList.Items) > 0 && len(podList.Items[0].Spec.Containers) > 0 { + currentAgentImage, err = transform.ParseImageRef(podList.Items[0].Spec.Containers[0].Image) if err != nil { return fmt.Errorf("unable to parse current agent image reference: %w", err) } @@ -124,13 +128,20 @@ func (h *Helm) UpdateZarfAgentValues(ctx context.Context) error { defer spinner.Stop() // Force pods to be recreated to get the updated secret. - err = h.cluster.DeletePods( - ctx, - k8s.PodLookup{ - Namespace: cluster.ZarfNamespaceName, - Selector: "app=agent-hook", - }, - ) + deleteGracePeriod := int64(0) + deletePolicy := metav1.DeletePropagationForeground + deleteOpts := metav1.DeleteOptions{ + GracePeriodSeconds: &deleteGracePeriod, + PropagationPolicy: &deletePolicy, + } + selector, err = metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{"app": "agent-hook"}}) + if err != nil { + return err + } + listOpts = metav1.ListOptions{ + LabelSelector: selector.String(), + } + err = h.cluster.Clientset.CoreV1().Pods(cluster.ZarfNamespaceName).DeleteCollection(ctx, deleteOpts, listOpts) if err != nil { return fmt.Errorf("error recycling pods for the Zarf Agent: %w", err) } diff --git a/src/pkg/cluster/data.go b/src/pkg/cluster/data.go index 0c5e526536..3253554544 100644 --- a/src/pkg/cluster/data.go +++ b/src/pkg/cluster/data.go @@ -9,26 +9,30 @@ import ( "fmt" "os" "path/filepath" + "sort" "strconv" "strings" "sync" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "github.com/defenseunicorns/pkg/helpers" + "github.com/defenseunicorns/zarf/src/config" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/layout" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/utils" "github.com/defenseunicorns/zarf/src/pkg/utils/exec" "github.com/defenseunicorns/zarf/src/types" - corev1 "k8s.io/api/core/v1" ) // HandleDataInjection waits for the target pod(s) to come up and inject the data into them // todo: this currently requires kubectl but we should have enough k8s work to make this native now. func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) { defer wg.Done() - injectionCompletionMarker := filepath.Join(componentPath.DataInjections, config.GetDataInjectionMarker()) if err := os.WriteFile(injectionCompletionMarker, []byte("🦄"), helpers.ReadWriteUser); err != nil { message.WarnErrf(err, "Unable to create the data injection completion marker") @@ -68,14 +72,14 @@ iterator: } } - target := k8s.PodLookup{ + target := podLookup{ Namespace: data.Target.Namespace, Selector: data.Target.Selector, Container: data.Target.Container, } // Wait until the pod we are injecting data into becomes available - pods := c.WaitForPodsAndContainers(ctx, target, podFilterByInitContainer) + pods := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer) if len(pods) < 1 { continue } @@ -132,7 +136,7 @@ iterator: } // Do not look for a specific container after injection in case they are running an init container - podOnlyTarget := k8s.PodLookup{ + podOnlyTarget := podLookup{ Namespace: data.Target.Namespace, Selector: data.Target.Selector, } @@ -140,7 +144,7 @@ iterator: // Block one final time to make sure at least one pod has come up and injected the data // Using only the pod as the final selector because we don't know what the container name will be // Still using the init container filter to make sure we have the right running pod - _ = c.WaitForPodsAndContainers(ctx, podOnlyTarget, podFilterByInitContainer) + _ = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer) // Cleanup now to reduce disk pressure _ = os.RemoveAll(source) @@ -149,3 +153,96 @@ iterator: return } } + +// podLookup is a struct for specifying a pod to target for data injection or lookups. +type podLookup struct { + Namespace string + Selector string + Container string +} + +// podFilter is a function that returns true if the pod should be targeted for data injection or lookups. +type podFilter func(pod corev1.Pod) bool + +// WaitForPodsAndContainers attempts to find pods matching the given selector and optional inclusion filter +// It will wait up to 90 seconds for the pods to be found and will return a list of matching pod names +// If the timeout is reached, an empty list will be returned. +// TODO: Test, refactor and/or remove. +func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) []corev1.Pod { + waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second) + defer cancel() + + timer := time.NewTimer(0) + defer timer.Stop() + + for { + select { + case <-waitCtx.Done(): + // k.Log("Pod lookup failed: %v", ctx.Err()) + return nil + case <-timer.C: + listOpts := metav1.ListOptions{ + LabelSelector: target.Selector, + } + podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts) + if err != nil { + // k.Log("Unable to find matching pods: %w", err) + return nil + } + + // k.Log("Found %d pods for target %#v", len(podList.Items), target) + + var readyPods = []corev1.Pod{} + + // Sort the pods from newest to oldest + sort.Slice(podList.Items, func(i, j int) bool { + return podList.Items[i].CreationTimestamp.After(podList.Items[j].CreationTimestamp.Time) + }) + + for _, pod := range podList.Items { + // k.Log("Testing pod %q", pod.Name) + + // If an include function is provided, only keep pods that return true + if include != nil && !include(pod) { + continue + } + + // Handle container targeting + if target.Container != "" { + // k.Log("Testing pod %q for container %q", pod.Name, target.Container) + + // Check the status of initContainers for a running match + for _, initContainer := range pod.Status.InitContainerStatuses { + isRunning := initContainer.State.Running != nil + if initContainer.Name == target.Container && isRunning { + // On running match in initContainer break this loop + readyPods = append(readyPods, pod) + break + } + } + + // Check the status of regular containers for a running match + for _, container := range pod.Status.ContainerStatuses { + isRunning := container.State.Running != nil + if container.Name == target.Container && isRunning { + readyPods = append(readyPods, pod) + break + } + } + } else { + status := pod.Status.Phase + // k.Log("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status) + // Regular status checking without a container + if status == corev1.PodRunning { + readyPods = append(readyPods, pod) + break + } + } + } + if len(readyPods) > 0 { + return readyPods + } + timer.Reset(3 * time.Second) + } + } +} diff --git a/src/pkg/cluster/injector.go b/src/pkg/cluster/injector.go index 3412553d29..64ced7eac3 100644 --- a/src/pkg/cluster/injector.go +++ b/src/pkg/cluster/injector.go @@ -108,7 +108,13 @@ func (c *Cluster) StartInjectionMadness(ctx context.Context, tmpDir string, imag spinner.Updatef("Attempting to bootstrap with the %s/%s", node, image) // Make sure the pod is not there first - err = c.DeletePod(ctx, ZarfNamespaceName, "injector") + deleteGracePeriod := int64(0) + deletePolicy := metav1.DeletePropagationForeground + deleteOpts := metav1.DeleteOptions{ + GracePeriodSeconds: &deleteGracePeriod, + PropagationPolicy: &deletePolicy, + } + err := c.Clientset.CoreV1().Pods(ZarfNamespaceName).Delete(ctx, "injector", deleteOpts) if err != nil { message.Debug("could not delete pod injector:", err) } @@ -123,7 +129,7 @@ func (c *Cluster) StartInjectionMadness(ctx context.Context, tmpDir string, imag } // Create the pod in the cluster - pod, err = c.CreatePod(ctx, pod) + pod, err = c.Clientset.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { // Just debug log the output because failures just result in trying the next image message.Debug("error creating pod in cluster:", pod, err) @@ -146,7 +152,8 @@ func (c *Cluster) StartInjectionMadness(ctx context.Context, tmpDir string, imag // StopInjectionMadness handles cleanup once the seed registry is up. func (c *Cluster) StopInjectionMadness(ctx context.Context) error { // Try to kill the injector pod now - if err := c.DeletePod(ctx, ZarfNamespaceName, "injector"); err != nil { + err := c.Clientset.CoreV1().Pods(ZarfNamespaceName).Delete(ctx, "injector", metav1.DeleteOptions{}) + if err != nil { return err } @@ -375,94 +382,99 @@ func (c *Cluster) createService(ctx context.Context) (*corev1.Service, error) { // buildInjectionPod return a pod for injection with the appropriate containers to perform the injection. func (c *Cluster) buildInjectionPod(node, image string, payloadConfigmaps []string, payloadShasum string) (*corev1.Pod, error) { - pod := c.GeneratePod("injector", ZarfNamespaceName) executeMode := int32(0777) - - pod.Labels["app"] = "zarf-injector" - - // Ensure zarf agent doesn't break the injector on future runs - pod.Labels[k8s.AgentLabel] = "ignore" - - // Bind the pod to the node the image was found on - pod.Spec.NodeName = node - - // Do not try to restart the pod as it will be deleted/re-created instead - pod.Spec.RestartPolicy = corev1.RestartPolicyNever - - pod.Spec.Containers = []corev1.Container{ - { - Name: "injector", - - // An existing image already present on the cluster - Image: image, - - // PullIfNotPresent because some distros provide a way (even in airgap) to pull images from local or direct-connected registries - ImagePullPolicy: corev1.PullIfNotPresent, - - // This directory's contents come from the init container output - WorkingDir: "/zarf-init", - - // Call the injector with shasum of the tarball - Command: []string{"/zarf-init/zarf-injector", payloadShasum}, - - // Shared mount between the init and regular containers - VolumeMounts: []corev1.VolumeMount{ - { - Name: "init", - MountPath: "/zarf-init/zarf-injector", - SubPath: "zarf-injector", - }, - { - Name: "seed", - MountPath: "/zarf-seed", - }, + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "injector", + Namespace: ZarfNamespaceName, + Labels: map[string]string{ + "app": "zarf-injector", + k8s.AgentLabel: "ignore", }, + }, + Spec: corev1.PodSpec{ + NodeName: node, + // Do not try to restart the pod as it will be deleted/re-created instead + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "injector", + + // An existing image already present on the cluster + Image: image, + + // PullIfNotPresent because some distros provide a way (even in airgap) to pull images from local or direct-connected registries + ImagePullPolicy: corev1.PullIfNotPresent, + + // This directory's contents come from the init container output + WorkingDir: "/zarf-init", + + // Call the injector with shasum of the tarball + Command: []string{"/zarf-init/zarf-injector", payloadShasum}, + + // Shared mount between the init and regular containers + VolumeMounts: []corev1.VolumeMount{ + { + Name: "init", + MountPath: "/zarf-init/zarf-injector", + SubPath: "zarf-injector", + }, + { + Name: "seed", + MountPath: "/zarf-seed", + }, + }, - // Readiness probe to optimize the pod startup time - ReadinessProbe: &corev1.Probe{ - PeriodSeconds: 2, - SuccessThreshold: 1, - FailureThreshold: 10, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/v2/", // path to health check - Port: intstr.FromInt(5000), // port to health check + // Readiness probe to optimize the pod startup time + ReadinessProbe: &corev1.Probe{ + PeriodSeconds: 2, + SuccessThreshold: 1, + FailureThreshold: 10, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/v2/", // path to health check + Port: intstr.FromInt(5000), // port to health check + }, + }, }, - }, - }, - // Keep resources as light as possible as we aren't actually running the container's other binaries - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: injectorRequestedCPU, - corev1.ResourceMemory: injectorRequestedMemory, - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: injectorLimitCPU, - corev1.ResourceMemory: injectorLimitMemory, + // Keep resources as light as possible as we aren't actually running the container's other binaries + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: injectorRequestedCPU, + corev1.ResourceMemory: injectorRequestedMemory, + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: injectorLimitCPU, + corev1.ResourceMemory: injectorLimitMemory, + }, + }, }, }, - }, - } - - pod.Spec.Volumes = []corev1.Volume{ - // Contains the rust binary and collection of configmaps from the tarball (seed image). - { - Name: "init", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: "rust-binary", + Volumes: []corev1.Volume{ + // Contains the rust binary and collection of configmaps from the tarball (seed image). + { + Name: "init", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "rust-binary", + }, + DefaultMode: &executeMode, + }, + }, + }, + // Empty directory to hold the seed image (new dir to avoid permission issues) + { + Name: "seed", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, }, - DefaultMode: &executeMode, }, - }, - }, - // Empty directory to hold the seed image (new dir to avoid permission issues) - { - Name: "seed", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, } @@ -504,14 +516,15 @@ func (c *Cluster) getImagesAndNodesForInjection(ctx context.Context) (imageNodeM case <-ctx.Done(): return nil, fmt.Errorf("get image list timed-out: %w", ctx.Err()) case <-timer.C: - pods, err := c.GetPods(ctx, corev1.NamespaceAll, metav1.ListOptions{ + listOpts := metav1.ListOptions{ FieldSelector: fmt.Sprintf("status.phase=%s", corev1.PodRunning), - }) + } + podList, err := c.Clientset.CoreV1().Pods(corev1.NamespaceAll).List(ctx, listOpts) if err != nil { return nil, fmt.Errorf("unable to get the list of %q pods in the cluster: %w", corev1.PodRunning, err) } - for _, pod := range pods.Items { + for _, pod := range podList.Items { nodeName := pod.Spec.NodeName nodeDetails, err := c.Clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) diff --git a/src/pkg/cluster/tunnel.go b/src/pkg/cluster/tunnel.go index 122a57abf4..f401743927 100644 --- a/src/pkg/cluster/tunnel.go +++ b/src/pkg/cluster/tunnel.go @@ -218,10 +218,12 @@ func (c *Cluster) checkForZarfConnectLabel(ctx context.Context, name string) (Tu zt.remotePort = svc.Spec.Ports[0].TargetPort.IntValue() // if targetPort == 0, look for Port (which is required) if zt.remotePort == 0 { - zt.remotePort, err = c.FindPodContainerPort(ctx, svc) + // TODO: Need a check for if container port is not found + remotePort, err := c.findPodContainerPort(ctx, svc) if err != nil { return TunnelInfo{}, err } + zt.remotePort = remotePort } // Add the url suffix too. @@ -235,6 +237,28 @@ func (c *Cluster) checkForZarfConnectLabel(ctx context.Context, name string) (Tu return zt, nil } +func (c *Cluster) findPodContainerPort(ctx context.Context, svc corev1.Service) (int, error) { + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: svc.Spec.Selector}) + if err != nil { + return 0, err + } + podList, err := c.Clientset.CoreV1().Pods(svc.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return 0, err + } + for _, pod := range podList.Items { + // Find the matching name on the port in the pod + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.Name == svc.Spec.Ports[0].TargetPort.String() { + return int(port.ContainerPort), nil + } + } + } + } + return 0, nil +} + // TODO: Refactor to use netip.AddrPort instead of a string for nodePortURL. func serviceInfoFromNodePortURL(services []corev1.Service, nodePortURL string) (string, string, int, error) { // Attempt to parse as normal, if this fails add a scheme to the URL (docker registries don't use schemes) diff --git a/src/pkg/k8s/common.go b/src/pkg/k8s/common.go index 9041775825..accb12d9b1 100644 --- a/src/pkg/k8s/common.go +++ b/src/pkg/k8s/common.go @@ -79,7 +79,7 @@ func (k *K8s) WaitForHealthyCluster(ctx context.Context) error { } // Get the cluster pod list - pods, err := k.GetAllPods(ctx) + pods, err := k.Clientset.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{}) if err != nil { k.Log("Could not get the pod list: %w", err) timer.Reset(waitDuration) diff --git a/src/pkg/k8s/pods.go b/src/pkg/k8s/pods.go deleted file mode 100644 index cbdc77161b..0000000000 --- a/src/pkg/k8s/pods.go +++ /dev/null @@ -1,207 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: 2021-Present The Zarf Authors - -// Package k8s provides a client for interacting with a Kubernetes cluster. -package k8s - -import ( - "context" - "sort" - "time" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -// GeneratePod creates a new pod without adding it to the k8s cluster. -func (k *K8s) GeneratePod(name, namespace string) *corev1.Pod { - pod := &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - APIVersion: corev1.SchemeGroupVersion.String(), - Kind: "Pod", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - Labels: make(Labels), - }, - } - - return pod -} - -// DeletePod removes a pod from the cluster by namespace & name. -func (k *K8s) DeletePod(ctx context.Context, namespace string, name string) error { - deleteGracePeriod := int64(0) - deletePolicy := metav1.DeletePropagationForeground - - err := k.Clientset.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{ - GracePeriodSeconds: &deleteGracePeriod, - PropagationPolicy: &deletePolicy, - }) - if err != nil { - return err - } - - timer := time.NewTimer(0) - defer timer.Stop() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-timer.C: - _, err := k.Clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) - if errors.IsNotFound(err) { - return nil - } - - timer.Reset(1 * time.Second) - } - } -} - -// DeletePods removes a collection of pods from the cluster by pod lookup. -func (k *K8s) DeletePods(ctx context.Context, target PodLookup) error { - deleteGracePeriod := int64(0) - deletePolicy := metav1.DeletePropagationForeground - return k.Clientset.CoreV1().Pods(target.Namespace).DeleteCollection( - ctx, - metav1.DeleteOptions{ - GracePeriodSeconds: &deleteGracePeriod, - PropagationPolicy: &deletePolicy, - }, - metav1.ListOptions{ - LabelSelector: target.Selector, - }, - ) -} - -// CreatePod inserts the given pod into the cluster. -func (k *K8s) CreatePod(ctx context.Context, pod *corev1.Pod) (*corev1.Pod, error) { - createOptions := metav1.CreateOptions{} - return k.Clientset.CoreV1().Pods(pod.Namespace).Create(ctx, pod, createOptions) -} - -// GetAllPods returns a list of pods from the cluster for all namespaces. -func (k *K8s) GetAllPods(ctx context.Context) (*corev1.PodList, error) { - return k.GetPods(ctx, corev1.NamespaceAll, metav1.ListOptions{}) -} - -// GetPods returns a list of pods from the cluster by namespace. -func (k *K8s) GetPods(ctx context.Context, namespace string, listOpts metav1.ListOptions) (*corev1.PodList, error) { - return k.Clientset.CoreV1().Pods(namespace).List(ctx, listOpts) -} - -// WaitForPodsAndContainers attempts to find pods matching the given selector and optional inclusion filter -// It will wait up to 90 seconds for the pods to be found and will return a list of matching pod names -// If the timeout is reached, an empty list will be returned. -func (k *K8s) WaitForPodsAndContainers(ctx context.Context, target PodLookup, include PodFilter) []corev1.Pod { - waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second) - defer cancel() - - timer := time.NewTimer(0) - defer timer.Stop() - - for { - select { - case <-waitCtx.Done(): - k.Log("Pod lookup failed: %v", ctx.Err()) - return nil - case <-timer.C: - pods, err := k.GetPods(ctx, target.Namespace, metav1.ListOptions{ - LabelSelector: target.Selector, - }) - if err != nil { - k.Log("Unable to find matching pods: %w", err) - return nil - } - - k.Log("Found %d pods for target %#v", len(pods.Items), target) - - var readyPods = []corev1.Pod{} - - // Sort the pods from newest to oldest - sort.Slice(pods.Items, func(i, j int) bool { - return pods.Items[i].CreationTimestamp.After(pods.Items[j].CreationTimestamp.Time) - }) - - for _, pod := range pods.Items { - k.Log("Testing pod %q", pod.Name) - - // If an include function is provided, only keep pods that return true - if include != nil && !include(pod) { - continue - } - - // Handle container targeting - if target.Container != "" { - k.Log("Testing pod %q for container %q", pod.Name, target.Container) - - // Check the status of initContainers for a running match - for _, initContainer := range pod.Status.InitContainerStatuses { - isRunning := initContainer.State.Running != nil - if initContainer.Name == target.Container && isRunning { - // On running match in initContainer break this loop - readyPods = append(readyPods, pod) - break - } - } - - // Check the status of regular containers for a running match - for _, container := range pod.Status.ContainerStatuses { - isRunning := container.State.Running != nil - if container.Name == target.Container && isRunning { - readyPods = append(readyPods, pod) - break - } - } - } else { - status := pod.Status.Phase - k.Log("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status) - // Regular status checking without a container - if status == corev1.PodRunning { - readyPods = append(readyPods, pod) - break - } - } - } - if len(readyPods) > 0 { - return readyPods - } - timer.Reset(3 * time.Second) - } - } -} - -// FindPodContainerPort will find a pod's container port from a service and return it. -// -// Returns 0 if no port is found. -func (k *K8s) FindPodContainerPort(ctx context.Context, svc corev1.Service) (int, error) { - selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: svc.Spec.Selector}) - if err != nil { - return 0, err - } - pods := k.WaitForPodsAndContainers( - ctx, - PodLookup{ - Namespace: svc.Namespace, - Selector: selector.String(), - }, - nil, - ) - - for _, pod := range pods { - // Find the matching name on the port in the pod - for _, container := range pod.Spec.Containers { - for _, port := range container.Ports { - if port.Name == svc.Spec.Ports[0].TargetPort.String() { - return int(port.ContainerPort), nil - } - } - } - } - - return 0, nil -} diff --git a/src/pkg/k8s/tunnel.go b/src/pkg/k8s/tunnel.go index 7b890d2e20..e32928f825 100644 --- a/src/pkg/k8s/tunnel.go +++ b/src/pkg/k8s/tunnel.go @@ -14,6 +14,7 @@ import ( "sync" "time" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" @@ -258,18 +259,16 @@ func (tunnel *Tunnel) getAttachablePodForService(ctx context.Context) (string, e if err != nil { return "", err } - - servicePods := tunnel.kube.WaitForPodsAndContainers( - ctx, - PodLookup{ - Namespace: tunnel.namespace, - Selector: selector.String(), - }, - nil, - ) - - if len(servicePods) < 1 { + listOpt := metav1.ListOptions{ + LabelSelector: selector.String(), + FieldSelector: fmt.Sprintf("status.phase=%s", corev1.PodRunning), + } + podList, err := tunnel.kube.Clientset.CoreV1().Pods(tunnel.namespace).List(ctx, listOpt) + if err != nil { + return "", err + } + if len(podList.Items) < 1 { return "", fmt.Errorf("no pods found for service %s", tunnel.resourceName) } - return servicePods[0].Name, nil + return podList.Items[0].Name, nil } diff --git a/src/pkg/k8s/types.go b/src/pkg/k8s/types.go index d52ea6389b..c109c2e9c5 100644 --- a/src/pkg/k8s/types.go +++ b/src/pkg/k8s/types.go @@ -5,7 +5,6 @@ package k8s import ( - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -13,9 +12,6 @@ import ( // Log is a function that logs a message. type Log func(string, ...any) -// Labels is a map of K8s labels. -type Labels map[string]string - // K8s is a client for interacting with a Kubernetes cluster. type K8s struct { Clientset kubernetes.Interface @@ -23,16 +19,6 @@ type K8s struct { Log Log } -// PodLookup is a struct for specifying a pod to target for data injection or lookups. -type PodLookup struct { - Namespace string `json:"namespace" jsonschema:"description=The namespace to target for data injection"` - Selector string `json:"selector" jsonschema:"description=The K8s selector to target for data injection"` - Container string `json:"container" jsonschema:"description=The container to target for data injection"` -} - -// PodFilter is a function that returns true if the pod should be targeted for data injection or lookups. -type PodFilter func(pod corev1.Pod) bool - // GeneratedPKI is a struct for storing generated PKI data. type GeneratedPKI struct { CA []byte `json:"ca"`