Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement log fetching #243

Merged
merged 4 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/ssh-portal/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package main implements the ssh-portal executable.
package main

import (
Expand Down
13 changes: 7 additions & 6 deletions cmd/ssh-portal/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (

// ServeCmd represents the serve command.
type ServeCmd struct {
NATSServer string `kong:"required,env='NATS_URL',help='NATS server URL (nats://... or tls://...)'"`
SSHServerPort uint `kong:"default='2222',env='SSH_SERVER_PORT',help='Port the SSH server will listen on for SSH client connections'"`
HostKeyECDSA string `kong:"env='HOST_KEY_ECDSA',help='PEM encoded ECDSA host key'"`
HostKeyED25519 string `kong:"env='HOST_KEY_ED25519',help='PEM encoded Ed25519 host key'"`
HostKeyRSA string `kong:"env='HOST_KEY_RSA',help='PEM encoded RSA host key'"`
NATSServer string `kong:"required,env='NATS_URL',help='NATS server URL (nats://... or tls://...)'"`
SSHServerPort uint `kong:"default='2222',env='SSH_SERVER_PORT',help='Port the SSH server will listen on for SSH client connections'"`
HostKeyECDSA string `kong:"env='HOST_KEY_ECDSA',help='PEM encoded ECDSA host key'"`
HostKeyED25519 string `kong:"env='HOST_KEY_ED25519',help='PEM encoded Ed25519 host key'"`
HostKeyRSA string `kong:"env='HOST_KEY_RSA',help='PEM encoded RSA host key'"`
LogAccessEnabled bool `kong:"env='LOG_ACCESS_ENABLED',help='Allow any user who can SSH into a pod to also access its logs.'"`
}

// Run the serve command to handle SSH connection requests.
Expand Down Expand Up @@ -72,5 +73,5 @@ func (cmd *ServeCmd) Run(log *zap.Logger) error {
}
}
// start serving SSH connection requests
return sshserver.Serve(ctx, log, nc, l, c, hostkeys)
return sshserver.Serve(ctx, log, nc, l, c, hostkeys, cmd.LogAccessEnabled)
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ require (
go.opentelemetry.io/otel v1.21.0
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.16.0
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/oauth2 v0.15.0
golang.org/x/sync v0.3.0
k8s.io/api v0.28.4
k8s.io/apimachinery v0.28.4
k8s.io/client-go v0.28.4
Expand Down Expand Up @@ -65,7 +67,6 @@ require (
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
10 changes: 8 additions & 2 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package k8s

import (
"sync"
"time"

"k8s.io/client-go/kubernetes"
Expand All @@ -14,10 +15,15 @@ const (
timeout = 90 * time.Second
)

// timeoutSeconds defines the common timeout for k8s API operations in the type
// required by metav1.ListOptions.
var timeoutSeconds = int64(timeout / time.Second)

// Client is a k8s client.
type Client struct {
config *rest.Config
clientset *kubernetes.Clientset
config *rest.Config
clientset *kubernetes.Clientset
logStreamIDs sync.Map
}

// NewClient creates a new kubernetes API client.
Expand Down
3 changes: 2 additions & 1 deletion internal/k8s/finddeployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ func (c *Client) FindDeployment(ctx context.Context, namespace,
service string) (string, error) {
deployments, err := c.clientset.AppsV1().Deployments(namespace).
List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("lagoon.sh/service=%s", service),
LabelSelector: fmt.Sprintf("lagoon.sh/service=%s", service),
TimeoutSeconds: &timeoutSeconds,
})
if err != nil {
return "", fmt.Errorf("couldn't list deployments: %v", err)
Expand Down
294 changes: 294 additions & 0 deletions internal/k8s/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
package k8s

import (
"bufio"
"context"
"fmt"
"io"
"sync"
"time"

"github.com/google/uuid"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

var (
// defaultTailLines is the number of log lines to tail by default if no number
// is specified
defaultTailLines int64 = 32
// maxTailLines is the maximum number of log lines to tail
maxTailLines int64 = 1024
// limitBytes defines the maximum number of bytes of logs returned from a
// single container
limitBytes int64 = 1 * 1024 * 1024 // 1MiB
)

// linewiseCopy reads strings separated by \n from logStream, and writes them
// with the given prefix and \n stripped to the logs channel. It returns when
// ctx is cancelled or the logStream closes.
func linewiseCopy(ctx context.Context, prefix string, logs chan<- string,
logStream io.ReadCloser) {
defer logStream.Close()
s := bufio.NewScanner(logStream)
for s.Scan() {
select {
case logs <- fmt.Sprintf("%s %s", prefix, s.Text()):
case <-ctx.Done():
return
}
}
}

// readLogs reads logs from the given pod, writing them back to the logs
// channel in a linewise manner. A goroutine is started via egSend to tail logs
// for each container. requestID is used to de-duplicate simultaneous logs
// requests associated with a single call to the higher-level Logs() function.
//
// readLogs returns immediately, and relies on ctx cancellation to ensure the
// goroutines it starts are cleaned up.
func (c *Client) readLogs(ctx context.Context, requestID string,
egSend *errgroup.Group, p *corev1.Pod, containerName string, follow bool,
tailLines int64, logs chan<- string) error {
var cStatuses []corev1.ContainerStatus
// if containerName is not specified, send logs for all containers
if containerName == "" {
cStatuses = p.Status.ContainerStatuses
} else {
for _, cStatus := range p.Status.ContainerStatuses {
if containerName == cStatus.Name {
cStatuses = append(cStatuses, cStatus)
break
}
}
if len(cStatuses) == 0 {
return fmt.Errorf("couldn't find container: %s", containerName)
}
}
for _, cStatus := range cStatuses {
// skip setting up another log stream if container is already being logged
_, exists := c.logStreamIDs.LoadOrStore(requestID+cStatus.ContainerID, true)
if exists {
continue
}
// set up stream for a single container
req := c.clientset.CoreV1().Pods(p.Namespace).GetLogs(p.Name,
&corev1.PodLogOptions{
Container: cStatus.Name,
Follow: follow,
Timestamps: true,
TailLines: &tailLines,
LimitBytes: &limitBytes,
})
logStream, err := req.Stream(ctx)
if err != nil {
return fmt.Errorf("couldn't stream logs: %v", err)
}
// copy loop vars so they can be referenced in the closure
cName := cStatus.Name
cID := cStatus.ContainerID
egSend.Go(func() error {
defer c.logStreamIDs.Delete(cID)
linewiseCopy(ctx, fmt.Sprintf("[pod/%s/%s]", p.Name, cName), logs,
logStream)
// When a pod is terminating, the k8s API sometimes sends an event
// showing a healthy pod _after_ an existing logStream for the same pod
// has closed. This happens occasionally on scale-down of a deployment.
// When this occurs there is a race where linewiseCopy() returns, then
// the "healthy" event comes in and linewiseCopy() is called again, only
// to return immediately. This can result in duplicated log lines being
// returned on the logs channel.
// To hack around this behaviour, pause here before exiting. This means
// that the container ID is retained in c.logStreamIDs for a brief period
// after logs stop streaming, which causes "healthy pod" events from the
// k8s API to be ignored for that period and thereby avoiding duplicate
// log lines being returned to the caller.
time.Sleep(time.Second)
return nil
})
}
return nil
}

// podEventHandler receives pod objects from the podInformer and, if they are
// in a ready state, starts streaming logs from them.
func (c *Client) podEventHandler(ctx context.Context,
cancel context.CancelFunc, requestID string, egSend *errgroup.Group,
container string, follow bool, tailLines int64, logs chan<- string, obj any) {
// panic if obj is not a pod, since we specifically use a pod informer
pod := obj.(*corev1.Pod)
if !slices.ContainsFunc(pod.Status.Conditions,
func(cond corev1.PodCondition) bool {
return cond.Type == corev1.ContainersReady &&
cond.Status == corev1.ConditionTrue
}) {
return // pod not ready
}
egSend.Go(func() error {
readLogsErr := c.readLogs(ctx, requestID, egSend, pod, container, follow,
tailLines, logs)
if readLogsErr != nil {
cancel()
return fmt.Errorf("couldn't read logs on new pod: %v", readLogsErr)
}
return nil
})
}

// newPodInformer sets up a k8s informer on pods in the given deployment, and
// returns the informer in an inert state. The informer is configured with
// event handlers to read logs from pods in the deployment, writing log lines
// back to the logs channel. It transparently handles the deployment scaling up
// and down (e.g. pods being added / deleted / restarted).
//
// When the caller calls Run() on the returned informer, it will start watching
// for events and sending to the logs channel.
func (c *Client) newPodInformer(ctx context.Context,
cancel context.CancelFunc, requestID string, egSend *errgroup.Group,
namespace, deployment, container string, follow bool, tailLines int64,
logs chan<- string) (cache.SharedIndexInformer, error) {
// get the deployment
d, err := c.clientset.AppsV1().Deployments(namespace).Get(ctx, deployment,
metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't get deployment: %v", err)
}
// configure the informer factory, filtering on deployment selector labels
factory := informers.NewSharedInformerFactoryWithOptions(c.clientset,
time.Hour, informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.LabelSelector = labels.SelectorFromSet(
d.Spec.Selector.MatchLabels).String()
}))
// construct the informer
podInformer := factory.Core().V1().Pods().Informer()
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// AddFunc handles events for new and existing pods. Since new pods are not
// in a ready state when initially added, it doesn't start log streaming
// for those.
AddFunc: func(obj any) {
c.podEventHandler(ctx, cancel, requestID, egSend, container, follow,
tailLines, logs, obj)
},
// UpdateFunc handles events for pod state changes. When new pods are added
// (e.g. deployment is scaled up) it repeatedly receives events until the
// pod is in its final healthy state. For that reason, the
// podEventHandler() inspects the pod state before initiating log
// streaming.
UpdateFunc: func(_, obj any) {
c.podEventHandler(ctx, cancel, requestID, egSend, container, follow,
tailLines, logs, obj)
},
})
if err != nil {
return nil, fmt.Errorf("couldn't add event handlers to informer: %v", err)
}
return podInformer, nil
}

// Logs takes a target namespace, deployment, and stdio stream, and writes the
// log output of the pods of of the deployment to the stdio stream. If
// container is specified, only logs of this container within the deployment
// are returned.
//
// This function exits on one of the following events:
//
// 1. It finishes sending the logs of the pods. This only occurs if
// follow=false.
// 2. ctx is cancelled (signalling that the SSH channel was closed).
// 3. An unrecoverable error occurs.
func (c *Client) Logs(ctx context.Context,
namespace, deployment, container string, follow bool, tailLines int64,
stdio io.ReadWriter) error {
// Wrap the context so we can cancel subroutines of this function on error.
childCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Generate a requestID value to uniquely distinguish between multiple calls
// to this function. This requestID is used in readLogs() to distinguish
// entries in c.logStreamIDs.
requestID := uuid.New().String()
// clamp tailLines
if tailLines < 1 {
tailLines = defaultTailLines
}
if tailLines > maxTailLines {
tailLines = maxTailLines
}
// put sending goroutines in an errgroup.Group to handle errors, and
// receiving goroutines in a waitgroup (since they have no errors)
var egSend errgroup.Group
var wgRecv sync.WaitGroup
// initialise a buffered channel for the worker goroutines to write to, and
// for this function to read log lines from
logs := make(chan string, 4)
// start a goroutine reading from the logs channel and writing back to stdio
wgRecv.Add(1)
go func() {
defer wgRecv.Done()
for {
select {
case msg := <-logs:
// ignore errors writing to stdio. this may happen if the client
// disconnects after reading off the channel but before the log can be
// written. there's nothing we can do in this case and we'll select
// ctx.Done() shortly anyway.
_, _ = fmt.Fprintln(stdio, msg)
case <-childCtx.Done():
return // context done - client went away or error within Logs()
}
}
}()
if follow {
// If following the logs, start a goroutine which watches for new (and
// existing) pods in the deployment and starts streaming logs from them.
egSend.Go(func() error {
podInformer, err := c.newPodInformer(childCtx, cancel, requestID,
&egSend, namespace, deployment, container, follow, tailLines, logs)
if err != nil {
return fmt.Errorf("couldn't construct new pod informer: %v", err)
}
podInformer.Run(childCtx.Done())
return nil
})
} else {
// If not following the logs, avoid constructing an informer. Instead just
// read the logs from all existing pods.
d, err := c.clientset.AppsV1().Deployments(namespace).Get(childCtx,
deployment, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("couldn't get deployment: %v", err)
}
pods, err := c.clientset.CoreV1().Pods(namespace).List(childCtx,
metav1.ListOptions{
LabelSelector: labels.FormatLabels(d.Spec.Selector.MatchLabels),
})
if err != nil {
return fmt.Errorf("couldn't get pods: %v", err)
}
if len(pods.Items) == 0 {
return fmt.Errorf("no pods for deployment %s", deployment)
}
for i := range pods.Items {
pod := pods.Items[i] // copy loop var so it can be referenced in the closure
egSend.Go(func() error {
readLogsErr := c.readLogs(childCtx, requestID, &egSend, &pod,
container, follow, tailLines, logs)
if readLogsErr != nil {
return fmt.Errorf("couldn't read logs on existing pods: %v", readLogsErr)
}
return nil
})
}
}
// Wait for the writes to finish, then close the logs channel, wait for the
// read goroutine to exit, and return any sendErr.
sendErr := egSend.Wait()
cancel()
wgRecv.Wait()
return sendErr
}
Loading
Loading