diff --git a/src/cmd/connect.go b/src/cmd/connect.go index 19422df967..26f7f5a78c 100644 --- a/src/cmd/connect.go +++ b/src/cmd/connect.go @@ -13,7 +13,6 @@ import ( "github.com/defenseunicorns/zarf/src/cmd/common" "github.com/defenseunicorns/zarf/src/config/lang" "github.com/defenseunicorns/zarf/src/pkg/cluster" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/utils/exec" "github.com/spf13/cobra" @@ -45,7 +44,7 @@ var ( ctx := cmd.Context() - var tunnel *k8s.Tunnel + var tunnel *cluster.Tunnel if connectResourceName != "" { zt := cluster.NewTunnelInfo(connectNamespace, connectResourceType, connectResourceName, "", connectLocalPort, connectRemotePort) tunnel, err = c.ConnectTunnelInfo(ctx, zt) @@ -107,7 +106,7 @@ func init() { connectCmd.Flags().StringVar(&connectResourceName, "name", "", lang.CmdConnectFlagName) connectCmd.Flags().StringVar(&connectNamespace, "namespace", cluster.ZarfNamespaceName, lang.CmdConnectFlagNamespace) - connectCmd.Flags().StringVar(&connectResourceType, "type", k8s.SvcResource, lang.CmdConnectFlagType) + connectCmd.Flags().StringVar(&connectResourceType, "type", cluster.SvcResource, lang.CmdConnectFlagType) connectCmd.Flags().IntVar(&connectLocalPort, "local-port", 0, lang.CmdConnectFlagLocalPort) connectCmd.Flags().IntVar(&connectRemotePort, "remote-port", 0, lang.CmdConnectFlagRemotePort) connectCmd.Flags().BoolVar(&cliOnly, "cli-only", false, lang.CmdConnectFlagCliOnly) diff --git a/src/internal/packager/git/gitea.go b/src/internal/packager/git/gitea.go index d243337d3c..f00f7b1b07 100644 --- a/src/internal/packager/git/gitea.go +++ b/src/internal/packager/git/gitea.go @@ -17,7 +17,6 @@ import ( "github.com/defenseunicorns/zarf/src/config" "github.com/defenseunicorns/zarf/src/pkg/cluster" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/types" "k8s.io/apimachinery/pkg/runtime/schema" @@ -41,7 +40,7 @@ func (g *Git) CreateReadOnlyUser(ctx context.Context) error { } // Establish a git tunnel to send the repo - tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, k8s.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) + tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, cluster.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) if err != nil { return err } @@ -128,7 +127,7 @@ func (g *Git) UpdateGitUser(ctx context.Context, oldAdminPass string, username s return err } // Establish a git tunnel to send the repo - tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, k8s.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) + tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, cluster.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) if err != nil { return err } @@ -167,7 +166,7 @@ func (g *Git) CreatePackageRegistryToken(ctx context.Context) (CreateTokenRespon } // Establish a git tunnel to send the repo - tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, k8s.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) + tunnel, err := c.NewTunnel(cluster.ZarfNamespaceName, cluster.SvcResource, cluster.ZarfGitServerName, "", 0, cluster.ZarfGitServerPort) if err != nil { return CreateTokenResponse{}, err } diff --git a/src/internal/packager/images/push.go b/src/internal/packager/images/push.go index e9a3645335..1da633680b 100644 --- a/src/internal/packager/images/push.go +++ b/src/internal/packager/images/push.go @@ -11,7 +11,6 @@ import ( "github.com/defenseunicorns/pkg/helpers" "github.com/defenseunicorns/zarf/src/pkg/cluster" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/transform" "github.com/defenseunicorns/zarf/src/pkg/utils" @@ -48,7 +47,7 @@ func Push(ctx context.Context, cfg PushConfig) error { var ( err error - tunnel *k8s.Tunnel + tunnel *cluster.Tunnel registryURL = cfg.RegInfo.Address ) diff --git a/src/pkg/cluster/injector.go b/src/pkg/cluster/injector.go index 3412553d29..8ee218799d 100644 --- a/src/pkg/cluster/injector.go +++ b/src/pkg/cluster/injector.go @@ -276,7 +276,7 @@ func (c *Cluster) createPayloadConfigMaps(ctx context.Context, seedImagesDir, ta // Test for pod readiness and seed image presence. func (c *Cluster) injectorIsReady(ctx context.Context, seedImages []transform.Image, spinner *message.Spinner) bool { - tunnel, err := c.NewTunnel(ZarfNamespaceName, k8s.SvcResource, ZarfInjectorName, "", 0, ZarfInjectorPort) + tunnel, err := c.NewTunnel(ZarfNamespaceName, SvcResource, ZarfInjectorName, "", 0, ZarfInjectorPort) if err != nil { return false } diff --git a/src/pkg/cluster/tunnel.go b/src/pkg/cluster/tunnel.go index 122a57abf4..89e15b2b43 100644 --- a/src/pkg/cluster/tunnel.go +++ b/src/pkg/cluster/tunnel.go @@ -7,16 +7,23 @@ package cluster import ( "context" "fmt" + "io" + "net/http" "net/url" "strconv" "strings" + "sync" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" "github.com/defenseunicorns/pkg/helpers" "github.com/defenseunicorns/zarf/src/config" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/types" ) @@ -36,7 +43,7 @@ const ( ZarfGitServerPort = 3000 ) -// TunnelInfo is a struct that contains the necessary info to create a new k8s.Tunnel +// TunnelInfo is a struct that contains the necessary info to create a new Tunnel type TunnelInfo struct { localPort int remotePort int @@ -88,11 +95,11 @@ func (c *Cluster) PrintConnectTable(ctx context.Context) error { } // Connect will establish a tunnel to the specified target. -func (c *Cluster) Connect(ctx context.Context, target string) (*k8s.Tunnel, error) { +func (c *Cluster) Connect(ctx context.Context, target string) (*Tunnel, error) { var err error zt := TunnelInfo{ namespace: ZarfNamespaceName, - resourceType: k8s.SvcResource, + resourceType: SvcResource, } switch strings.ToUpper(target) { @@ -134,7 +141,7 @@ func (c *Cluster) Connect(ctx context.Context, target string) (*k8s.Tunnel, erro } // ConnectTunnelInfo connects to the cluster with the provided TunnelInfo -func (c *Cluster) ConnectTunnelInfo(ctx context.Context, zt TunnelInfo) (*k8s.Tunnel, error) { +func (c *Cluster) ConnectTunnelInfo(ctx context.Context, zt TunnelInfo) (*Tunnel, error) { tunnel, err := c.NewTunnel(zt.namespace, zt.resourceType, zt.resourceName, zt.urlSuffix, zt.localPort, zt.remotePort) if err != nil { return nil, err @@ -149,14 +156,14 @@ func (c *Cluster) ConnectTunnelInfo(ctx context.Context, zt TunnelInfo) (*k8s.Tu } // ConnectToZarfRegistryEndpoint determines if a registry endpoint is in cluster, and if so opens a tunnel to connect to it -func (c *Cluster) ConnectToZarfRegistryEndpoint(ctx context.Context, registryInfo types.RegistryInfo) (string, *k8s.Tunnel, error) { +func (c *Cluster) ConnectToZarfRegistryEndpoint(ctx context.Context, registryInfo types.RegistryInfo) (string, *Tunnel, error) { registryEndpoint := registryInfo.Address var err error - var tunnel *k8s.Tunnel + var tunnel *Tunnel if registryInfo.InternalRegistry { // Establish a registry tunnel to send the images to the zarf registry - if tunnel, err = c.NewTunnel(ZarfNamespaceName, k8s.SvcResource, ZarfRegistryName, "", 0, ZarfRegistryPort); err != nil { + if tunnel, err = c.NewTunnel(ZarfNamespaceName, SvcResource, ZarfRegistryName, "", 0, ZarfRegistryPort); err != nil { return "", tunnel, err } } else { @@ -168,7 +175,7 @@ func (c *Cluster) ConnectToZarfRegistryEndpoint(ctx context.Context, registryInf // If this is a service (no error getting svcInfo), create a port-forward tunnel to that resource if err == nil { - if tunnel, err = c.NewTunnel(namespace, k8s.SvcResource, name, "", 0, port); err != nil { + if tunnel, err = c.NewTunnel(namespace, SvcResource, name, "", 0, port); err != nil { return "", tunnel, err } } @@ -211,7 +218,7 @@ func (c *Cluster) checkForZarfConnectLabel(ctx context.Context, name string) (Tu svc := serviceList.Items[0] // Reset based on the matched params. - zt.resourceType = k8s.SvcResource + zt.resourceType = SvcResource zt.resourceName = svc.Name zt.namespace = svc.Namespace // Only support a service with a single port. @@ -273,3 +280,255 @@ func serviceInfoFromNodePortURL(services []corev1.Service, nodePortURL string) ( return "", "", 0, fmt.Errorf("no matching node port services found") } + +// Global lock to synchronize port selections. +var globalMutex sync.Mutex + +// Zarf Tunnel Configuration Constants. +const ( + PodResource = "pod" + SvcResource = "svc" +) + +// Tunnel is the main struct that configures and manages port forwarding tunnels to Kubernetes resources. +type Tunnel struct { + clientset kubernetes.Interface + restConfig *rest.Config + out io.Writer + localPort int + remotePort int + namespace string + resourceType string + resourceName string + urlSuffix string + attempt int + stopChan chan struct{} + readyChan chan struct{} + errChan chan error +} + +// NewTunnel will create a new Tunnel struct. +// Note that if you use 0 for the local port, an open port on the host system +// will be selected automatically, and the Tunnel struct will be updated with the selected port. +func (c *Cluster) NewTunnel(namespace, resourceType, resourceName, urlSuffix string, local, remote int) (*Tunnel, error) { + return &Tunnel{ + clientset: c.Clientset, + restConfig: c.RestConfig, + out: io.Discard, + localPort: local, + remotePort: remote, + namespace: namespace, + resourceType: resourceType, + resourceName: resourceName, + urlSuffix: urlSuffix, + stopChan: make(chan struct{}, 1), + readyChan: make(chan struct{}, 1), + }, nil +} + +// Wrap takes a function that returns an error and wraps it to check for tunnel errors as well. +func (tunnel *Tunnel) Wrap(function func() error) error { + var err error + funcErrChan := make(chan error) + + go func() { + funcErrChan <- function() + }() + + select { + case err = <-funcErrChan: + return err + case err = <-tunnel.ErrChan(): + return err + } +} + +// Connect will establish a tunnel to the specified target. +func (tunnel *Tunnel) Connect(ctx context.Context) (string, error) { + url, err := tunnel.establish(ctx) + + // Try to establish the tunnel up to 3 times. + if err != nil { + tunnel.attempt++ + + // If we have exceeded the number of attempts, exit with an error. + if tunnel.attempt > 3 { + return "", fmt.Errorf("unable to establish tunnel after 3 attempts: %w", err) + } + + // Otherwise, retry the connection but delay increasing intervals between attempts. + delay := tunnel.attempt * 10 + message.Debugf("%s", err.Error()) + message.Debugf("Delay creating tunnel, waiting %d seconds...", delay) + + timer := time.NewTimer(0) + defer timer.Stop() + + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-timer.C: + url, err = tunnel.Connect(ctx) + if err != nil { + return "", err + } + + timer.Reset(time.Duration(delay) * time.Second) + } + } + + return url, nil +} + +// Endpoint returns the tunnel ip address and port (i.e. for docker registries) +func (tunnel *Tunnel) Endpoint() string { + return fmt.Sprintf("%s:%d", helpers.IPV4Localhost, tunnel.localPort) +} + +// ErrChan returns the tunnel's error channel +func (tunnel *Tunnel) ErrChan() chan error { + return tunnel.errChan +} + +// HTTPEndpoint returns the tunnel endpoint as a HTTP URL string. +func (tunnel *Tunnel) HTTPEndpoint() string { + return fmt.Sprintf("http://%s", tunnel.Endpoint()) +} + +// FullURL returns the tunnel endpoint as a HTTP URL string with the urlSuffix appended. +func (tunnel *Tunnel) FullURL() string { + return fmt.Sprintf("%s%s", tunnel.HTTPEndpoint(), tunnel.urlSuffix) +} + +// Close disconnects a tunnel connection by closing the StopChan, thereby stopping the goroutine. +func (tunnel *Tunnel) Close() { + close(tunnel.stopChan) +} + +// establish opens a tunnel to a kubernetes resource, as specified by the provided tunnel struct. +func (tunnel *Tunnel) establish(ctx context.Context) (string, error) { + var err error + + // Track this locally as we may need to retry if the tunnel fails. + localPort := tunnel.localPort + + // If the local-port is 0, get an available port before continuing. We do this here instead of relying on the + // underlying port-forwarder library, because the port-forwarder library does not expose the selected local port in a + // machine-readable manner. + // Synchronize on the global lock to avoid race conditions with concurrently selecting the same available port, + // since there is a brief moment between `GetAvailablePort` and `forwarder.ForwardPorts` where the selected port + // is available for selection again. + if localPort == 0 { + message.Debugf("Requested local port is 0. Selecting an open port on host system") + localPort, err = helpers.GetAvailablePort() + if err != nil { + return "", fmt.Errorf("unable to find an available port: %w", err) + } + message.Debugf("Selected port %d", localPort) + globalMutex.Lock() + defer globalMutex.Unlock() + } + + msg := fmt.Sprintf("Opening tunnel %d -> %d for %s/%s in namespace %s", + localPort, + tunnel.remotePort, + tunnel.resourceType, + tunnel.resourceName, + tunnel.namespace, + ) + message.Debugf(msg) + + // Find the pod to port forward to + podName, err := tunnel.getAttachablePodForResource(ctx) + if err != nil { + return "", fmt.Errorf("unable to find pod attached to given resource: %w", err) + } + message.Debugf("Selected pod %s to open port forward to", podName) + + // Build url to the port forward endpoint. + // Example: http://localhost:8080/api/v1/namespaces/helm/pods/tiller-deploy-9itlq/portforward. + postEndpoint := tunnel.clientset.CoreV1().RESTClient().Post() + namespace := tunnel.namespace + portForwardCreateURL := postEndpoint. + Resource("pods"). + Namespace(namespace). + Name(podName). + SubResource("portforward"). + URL() + + message.Debugf("Using URL %s to create portforward", portForwardCreateURL) + + // Construct the spdy client required by the client-go portforward library. + transport, upgrader, err := spdy.RoundTripperFor(tunnel.restConfig) + if err != nil { + return "", fmt.Errorf("unable to create the spdy client %w", err) + } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", portForwardCreateURL) + + // Construct a new PortForwarder struct that manages the instructed port forward tunnel. + ports := []string{fmt.Sprintf("%d:%d", localPort, tunnel.remotePort)} + portforwarder, err := portforward.New(dialer, ports, tunnel.stopChan, tunnel.readyChan, tunnel.out, tunnel.out) + if err != nil { + return "", fmt.Errorf("unable to create the port forward: %w", err) + } + + // Open the tunnel in a goroutine so that it is available in the background. Report errors to the main goroutine via + // a new channel. + errChan := make(chan error) + go func() { + errChan <- portforwarder.ForwardPorts() + }() + + // Wait for an error or the tunnel to be ready. + select { + case err = <-errChan: + return "", fmt.Errorf("unable to start the tunnel: %w", err) + case <-portforwarder.Ready: + // Store for endpoint output + tunnel.localPort = localPort + url := tunnel.FullURL() + + // Store the error channel to listen for errors + tunnel.errChan = errChan + + message.Debugf("Creating port forwarding tunnel at %s", url) + return url, nil + } +} + +// getAttachablePodForResource will find a pod that can be port forwarded to the provided resource type and return +// the name. +func (tunnel *Tunnel) getAttachablePodForResource(ctx context.Context) (string, error) { + switch tunnel.resourceType { + case PodResource: + return tunnel.resourceName, nil + case SvcResource: + return tunnel.getAttachablePodForService(ctx) + default: + return "", fmt.Errorf("unknown resource type: %s", tunnel.resourceType) + } +} + +// getAttachablePodForService will find an active pod associated with the Service and return the pod name. +func (tunnel *Tunnel) getAttachablePodForService(ctx context.Context) (string, error) { + service, err := tunnel.clientset.CoreV1().Services(tunnel.namespace).Get(ctx, tunnel.resourceName, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("unable to find the service: %w", err) + } + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: service.Spec.Selector}) + if err != nil { + return "", err + } + listOpt := metav1.ListOptions{ + LabelSelector: selector.String(), + FieldSelector: fmt.Sprintf("status.phase=%s", corev1.PodRunning), + } + podList, err := tunnel.clientset.CoreV1().Pods(tunnel.namespace).List(ctx, listOpt) + if err != nil { + return "", err + } + if len(podList.Items) < 1 { + return "", fmt.Errorf("no pods found for service %s", tunnel.resourceName) + } + return podList.Items[0].Name, nil +} diff --git a/src/pkg/k8s/tunnel.go b/src/pkg/k8s/tunnel.go deleted file mode 100644 index 7b890d2e20..0000000000 --- a/src/pkg/k8s/tunnel.go +++ /dev/null @@ -1,275 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: 2021-Present The Zarf Authors - -// Package k8s provides a client for interacting with a Kubernetes cluster. -package k8s - -// Forked from https://github.com/gruntwork-io/terratest/blob/v0.38.8/modules/k8s/tunnel.go - -import ( - "context" - "fmt" - "io" - "net/http" - "sync" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/transport/spdy" - - "github.com/defenseunicorns/pkg/helpers" -) - -// Global lock to synchronize port selections. -var globalMutex sync.Mutex - -// Zarf Tunnel Configuration Constants. -const ( - PodResource = "pod" - SvcResource = "svc" -) - -// Tunnel is the main struct that configures and manages port forwarding tunnels to Kubernetes resources. -type Tunnel struct { - kube *K8s - out io.Writer - localPort int - remotePort int - namespace string - resourceType string - resourceName string - urlSuffix string - attempt int - stopChan chan struct{} - readyChan chan struct{} - errChan chan error -} - -// NewTunnel will create a new Tunnel struct. -// Note that if you use 0 for the local port, an open port on the host system -// will be selected automatically, and the Tunnel struct will be updated with the selected port. -func (k *K8s) NewTunnel(namespace, resourceType, resourceName, urlSuffix string, local, remote int) (*Tunnel, error) { - return &Tunnel{ - out: io.Discard, - localPort: local, - remotePort: remote, - namespace: namespace, - resourceType: resourceType, - resourceName: resourceName, - urlSuffix: urlSuffix, - stopChan: make(chan struct{}, 1), - readyChan: make(chan struct{}, 1), - kube: k, - }, nil -} - -// Wrap takes a function that returns an error and wraps it to check for tunnel errors as well. -func (tunnel *Tunnel) Wrap(function func() error) error { - var err error - funcErrChan := make(chan error) - - go func() { - funcErrChan <- function() - }() - - select { - case err = <-funcErrChan: - return err - case err = <-tunnel.ErrChan(): - return err - } -} - -// Connect will establish a tunnel to the specified target. -func (tunnel *Tunnel) Connect(ctx context.Context) (string, error) { - url, err := tunnel.establish(ctx) - - // Try to establish the tunnel up to 3 times. - if err != nil { - tunnel.attempt++ - - // If we have exceeded the number of attempts, exit with an error. - if tunnel.attempt > 3 { - return "", fmt.Errorf("unable to establish tunnel after 3 attempts: %w", err) - } - - // Otherwise, retry the connection but delay increasing intervals between attempts. - delay := tunnel.attempt * 10 - tunnel.kube.Log("%s", err.Error()) - tunnel.kube.Log("Delay creating tunnel, waiting %d seconds...", delay) - - timer := time.NewTimer(0) - defer timer.Stop() - - select { - case <-ctx.Done(): - return "", ctx.Err() - case <-timer.C: - url, err = tunnel.Connect(ctx) - if err != nil { - return "", err - } - - timer.Reset(time.Duration(delay) * time.Second) - } - } - - return url, nil -} - -// Endpoint returns the tunnel ip address and port (i.e. for docker registries) -func (tunnel *Tunnel) Endpoint() string { - return fmt.Sprintf("%s:%d", helpers.IPV4Localhost, tunnel.localPort) -} - -// ErrChan returns the tunnel's error channel -func (tunnel *Tunnel) ErrChan() chan error { - return tunnel.errChan -} - -// HTTPEndpoint returns the tunnel endpoint as a HTTP URL string. -func (tunnel *Tunnel) HTTPEndpoint() string { - return fmt.Sprintf("http://%s", tunnel.Endpoint()) -} - -// FullURL returns the tunnel endpoint as a HTTP URL string with the urlSuffix appended. -func (tunnel *Tunnel) FullURL() string { - return fmt.Sprintf("%s%s", tunnel.HTTPEndpoint(), tunnel.urlSuffix) -} - -// Close disconnects a tunnel connection by closing the StopChan, thereby stopping the goroutine. -func (tunnel *Tunnel) Close() { - close(tunnel.stopChan) -} - -// establish opens a tunnel to a kubernetes resource, as specified by the provided tunnel struct. -func (tunnel *Tunnel) establish(ctx context.Context) (string, error) { - var err error - - // Track this locally as we may need to retry if the tunnel fails. - localPort := tunnel.localPort - - // If the local-port is 0, get an available port before continuing. We do this here instead of relying on the - // underlying port-forwarder library, because the port-forwarder library does not expose the selected local port in a - // machine-readable manner. - // Synchronize on the global lock to avoid race conditions with concurrently selecting the same available port, - // since there is a brief moment between `GetAvailablePort` and `forwarder.ForwardPorts` where the selected port - // is available for selection again. - if localPort == 0 { - tunnel.kube.Log("Requested local port is 0. Selecting an open port on host system") - localPort, err = helpers.GetAvailablePort() - if err != nil { - return "", fmt.Errorf("unable to find an available port: %w", err) - } - tunnel.kube.Log("Selected port %d", localPort) - globalMutex.Lock() - defer globalMutex.Unlock() - } - - message := fmt.Sprintf("Opening tunnel %d -> %d for %s/%s in namespace %s", - localPort, - tunnel.remotePort, - tunnel.resourceType, - tunnel.resourceName, - tunnel.namespace, - ) - - tunnel.kube.Log(message) - - // Find the pod to port forward to - podName, err := tunnel.getAttachablePodForResource(ctx) - if err != nil { - return "", fmt.Errorf("unable to find pod attached to given resource: %w", err) - } - tunnel.kube.Log("Selected pod %s to open port forward to", podName) - - // Build url to the port forward endpoint. - // Example: http://localhost:8080/api/v1/namespaces/helm/pods/tiller-deploy-9itlq/portforward. - postEndpoint := tunnel.kube.Clientset.CoreV1().RESTClient().Post() - namespace := tunnel.namespace - portForwardCreateURL := postEndpoint. - Resource("pods"). - Namespace(namespace). - Name(podName). - SubResource("portforward"). - URL() - - tunnel.kube.Log("Using URL %s to create portforward", portForwardCreateURL) - - // Construct the spdy client required by the client-go portforward library. - transport, upgrader, err := spdy.RoundTripperFor(tunnel.kube.RestConfig) - if err != nil { - return "", fmt.Errorf("unable to create the spdy client %w", err) - } - dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", portForwardCreateURL) - - // Construct a new PortForwarder struct that manages the instructed port forward tunnel. - ports := []string{fmt.Sprintf("%d:%d", localPort, tunnel.remotePort)} - portforwarder, err := portforward.New(dialer, ports, tunnel.stopChan, tunnel.readyChan, tunnel.out, tunnel.out) - if err != nil { - return "", fmt.Errorf("unable to create the port forward: %w", err) - } - - // Open the tunnel in a goroutine so that it is available in the background. Report errors to the main goroutine via - // a new channel. - errChan := make(chan error) - go func() { - errChan <- portforwarder.ForwardPorts() - }() - - // Wait for an error or the tunnel to be ready. - select { - case err = <-errChan: - return "", fmt.Errorf("unable to start the tunnel: %w", err) - case <-portforwarder.Ready: - // Store for endpoint output - tunnel.localPort = localPort - url := tunnel.FullURL() - - // Store the error channel to listen for errors - tunnel.errChan = errChan - - tunnel.kube.Log("Creating port forwarding tunnel at %s", url) - return url, nil - } -} - -// getAttachablePodForResource will find a pod that can be port forwarded to the provided resource type and return -// the name. -func (tunnel *Tunnel) getAttachablePodForResource(ctx context.Context) (string, error) { - switch tunnel.resourceType { - case PodResource: - return tunnel.resourceName, nil - case SvcResource: - return tunnel.getAttachablePodForService(ctx) - default: - return "", fmt.Errorf("unknown resource type: %s", tunnel.resourceType) - } -} - -// getAttachablePodForService will find an active pod associated with the Service and return the pod name. -func (tunnel *Tunnel) getAttachablePodForService(ctx context.Context) (string, error) { - service, err := tunnel.kube.Clientset.CoreV1().Services(tunnel.namespace).Get(ctx, tunnel.resourceName, metav1.GetOptions{}) - if err != nil { - return "", fmt.Errorf("unable to find the service: %w", err) - } - selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: service.Spec.Selector}) - if err != nil { - return "", err - } - - servicePods := tunnel.kube.WaitForPodsAndContainers( - ctx, - PodLookup{ - Namespace: tunnel.namespace, - Selector: selector.String(), - }, - nil, - ) - - if len(servicePods) < 1 { - return "", fmt.Errorf("no pods found for service %s", tunnel.resourceName) - } - return servicePods[0].Name, nil -} diff --git a/src/pkg/packager/deploy.go b/src/pkg/packager/deploy.go index 2805c96620..f854936f29 100644 --- a/src/pkg/packager/deploy.go +++ b/src/pkg/packager/deploy.go @@ -25,7 +25,6 @@ import ( "github.com/defenseunicorns/zarf/src/internal/packager/images" "github.com/defenseunicorns/zarf/src/internal/packager/template" "github.com/defenseunicorns/zarf/src/pkg/cluster" - "github.com/defenseunicorns/zarf/src/pkg/k8s" "github.com/defenseunicorns/zarf/src/pkg/layout" "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/packager/actions" @@ -527,7 +526,7 @@ func (p *Packager) pushReposToRepository(ctx context.Context, reposPath string, } } - tunnel, err := p.cluster.NewTunnel(namespace, k8s.SvcResource, name, "", 0, port) + tunnel, err := p.cluster.NewTunnel(namespace, cluster.SvcResource, name, "", 0, port) if err != nil { return err }