Skip to content

Commit

Permalink
Internal change (diffbased).
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 700461488
  • Loading branch information
EtiennePerot authored and gvisor-bot committed Nov 27, 2024
1 parent ac42faf commit 936bd4b
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 21 deletions.
5 changes: 4 additions & 1 deletion test/kubernetes/benchmarks/abslbuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"path"
"strings"
"testing"
"time"

k8s "gvisor.dev/gvisor/test/kubernetes"
"gvisor.dev/gvisor/test/kubernetes/benchmarks/profiling"
Expand Down Expand Up @@ -118,7 +119,9 @@ func BuildABSL(ctx context.Context, t *testing.T, k8sCtx k8sctx.KubernetesContex
}
defer cluster.DeletePod(ctx, pod)

containerDuration, err := benchmetric.GetTimedContainerDuration(ctx, cluster, pod, name)
waitDeadlineCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
containerDuration, err := benchmetric.GetTimedContainerDuration(waitDeadlineCtx, cluster, pod, name)
cancel()
if err != nil {
t.Fatalf("Failed to get container duration: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions test/kubernetes/benchmarks/profiling/profiling.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,10 @@ func MaybeSetup(ctx context.Context, t *testing.T, k8sCtx k8sctx.KubernetesConte
defer setupCancel()
ds, err := startOperations(setupCtx, k8sCtx, c, ns, setupCommands)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to start profiling setup operations: %w", err)
}
if err := c.WaitForDaemonset(setupCtx, ds); err != nil {
return nil, err
return nil, fmt.Errorf("failed to wait for daemonset: %w", err)
}
}
return cleanup, nil
Expand Down
10 changes: 10 additions & 0 deletions test/kubernetes/benchmarks/pytorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ var (
mode: eager,
},
}

// AllTests is a map of test names to the tests.
AllTests = map[string][]pytorchTest{
"FastNLPBert": FastNLPBert,
"BigBird": BigBird,
"SpeechTransformer": SpeechTransformer,
"LearningToPaint": LearningToPaint,
"MobileNetV2": MobileNetV2,
"BackgroundMatting": BackgroundMatting,
}
)

// Name returns the name of the test with the argument parameters included. It is formatted so
Expand Down
2 changes: 1 addition & 1 deletion test/kubernetes/benchmetric/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ load("//tools:defs.bzl", "go_library")

package(
default_applicable_licenses = ["//:license"],
default_visibility = ["//test/kubernetes:__subpackages__"],
default_visibility = ["//:sandbox"],
licenses = ["notice"],
)

Expand Down
12 changes: 12 additions & 0 deletions test/kubernetes/benchmetric/benchmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,20 @@ var (
recorderOnce sync.Once
)

type recorderContextKeyType int

const recorderContextKey recorderContextKeyType = iota

// WithRecorder returns a context with the given `Recorder`.
func WithRecorder(ctx context.Context, recorder Recorder) context.Context {
return context.WithValue(ctx, recorderContextKey, recorder)
}

// GetRecorder returns the benchmark's `Recorder` singleton.
func GetRecorder(ctx context.Context) (Recorder, error) {
if ctx.Value(recorderContextKey) != nil {
return ctx.Value(recorderContextKey).(Recorder), nil
}
recorderOnce.Do(func() {
recorder, recorderErr = recorderFn(ctx)
})
Expand Down
1 change: 1 addition & 0 deletions test/kubernetes/testcluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//visibility:public",
],
deps = [
"//pkg/log",
"//pkg/sync",
"//test/kubernetes:test_range_config_go_proto",
"@io_k8s_api//apps/v1:go_default_library",
Expand Down
71 changes: 54 additions & 17 deletions test/kubernetes/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"golang.org/x/sync/errgroup"
cspb "google.golang.org/genproto/googleapis/container/v1"
"gvisor.dev/gvisor/pkg/log"
"gvisor.dev/gvisor/pkg/sync"
testpb "gvisor.dev/gvisor/test/kubernetes/test_range_config_go_proto"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -437,22 +438,38 @@ func (t *TestCluster) doWaitForPod(ctx context.Context, pod *v13.Pod, phase v13.
if err != nil {
return fmt.Errorf("watch: %w", err)
}
podLogger := log.BasicRateLimitedLogger(5 * time.Minute)
incompatibleTypeLogger := log.BasicRateLimitedLogger(5 * time.Minute)
startLogTime := time.Now().Add(3 * time.Minute)

var p *v13.Pod
gotIncompatibleType := false
pollCh := time.NewTicker(10 * time.Second)
defer pollCh.Stop()
pollLoop:
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-pollCh.C:
p, err = t.GetPod(ctx, pod)
if err != nil {
return fmt.Errorf("failed to poll pod: %w", err)
}
case e := <-w.ResultChan():
var ok bool
p, ok = e.Object.(*v13.Pod)
if !ok {
return fmt.Errorf("invalid object watched: %T", p)
}
case <-time.After(10 * time.Second):
p, err = t.GetPod(ctx, pod)
if err != nil {
return fmt.Errorf("failed to poll pod: %w", err)
if !gotIncompatibleType {
log.Warningf("Received unexpected type of watched pod: got %T (%v), expected %T; falling back to polling-based wait.", e.Object, e.Object, p)
gotIncompatibleType = true
pollCh = time.NewTicker(250 * time.Millisecond)
defer pollCh.Stop()
} else {
incompatibleTypeLogger.Infof("Received another unexpected type of watched pod: got %T (%v), expected %T.", e.Object, e.Object, p)
}
time.Sleep(10 * time.Millisecond) // Avoid busy-looping when `w.ResultChan()` is closed.
continue pollLoop
}
}
if ctx.Err() != nil {
Expand All @@ -474,6 +491,9 @@ func (t *TestCluster) doWaitForPod(ctx context.Context, pod *v13.Pod, phase v13.
case phase:
return nil
}
if time.Now().After(startLogTime) {
podLogger.Infof("Still waiting for pod %q after %v; pod status: %v", pod.GetName(), time.Since(startLogTime), p.Status)
}
}
}

Expand Down Expand Up @@ -721,27 +741,44 @@ func (t *TestCluster) WaitForDaemonset(ctx context.Context, ds *appsv1.DaemonSet
defer w.Stop()
var lastDS *appsv1.DaemonSet

for daemonSetReady := false; !daemonSetReady; {
// Wait up to 15 seconds before the context deadline.
// We don't wait all the way up to the context deadline, because we
// want to have enough time to do a manual poll after the watch loop
// in order to get the current state of the DaemonSet and provide a
// better error message in case the API server is down or slow.
const watchTimeoutEarly = 15 * time.Second
readyCtx := ctx
ctxDeadline, hasDeadline := readyCtx.Deadline()
if hasDeadline && ctxDeadline.Sub(time.Now()) > watchTimeoutEarly {
var cancel context.CancelFunc
readyCtx, cancel = context.WithDeadline(ctx, ctxDeadline.Add(-watchTimeoutEarly))
defer cancel()
}

watchLoop:
for {
select {
case <-ctx.Done():
if lastDS != nil {
return fmt.Errorf("context canceled before healthy; last DaemonSet status: %#v", lastDS.Status)
}
return fmt.Errorf("context canceled before healthy")
case <-readyCtx.Done():
// Give up the watch loop. Most likely the API server is down or the
// DaemonSet can't schedule. Either way, the poll-based loop below
// will provide a better error message.
log.Infof("Watching DaemonSet %q: watch loop timed out (%v); falling back to poll-based loop.", ds.GetName(), readyCtx.Err())
break watchLoop
case e, ok := <-w.ResultChan():
d, ok := e.Object.(*appsv1.DaemonSet)
if !ok {
return fmt.Errorf("invalid object type: %T", d)
}
lastDS = d
if d.Status.NumberReady == d.Status.DesiredNumberScheduled && d.Status.DesiredNumberScheduled > 0 && d.Status.NumberUnavailable == 0 {
daemonSetReady = true
break watchLoop
}
}
}

// Now wait for the pods to be running.
for ctx.Err() == nil {
// Poll-based loop to wait for the DaemonSet to be ready.
var lastBadPod v13.Pod
for {
pods, err := t.GetPodsInDaemonSet(ctx, ds)
if err != nil {
return fmt.Errorf("failed to get pods in daemonset: %v", err)
Expand All @@ -755,6 +792,7 @@ func (t *TestCluster) WaitForDaemonset(ctx context.Context, ds *appsv1.DaemonSet
case v13.PodRunning, v13.PodSucceeded:
// OK, do nothing.
default:
lastBadPod = pod
allOK = false
}
}
Expand All @@ -763,11 +801,10 @@ func (t *TestCluster) WaitForDaemonset(ctx context.Context, ds *appsv1.DaemonSet
}
select {
case <-ctx.Done():
return ctx.Err()
return fmt.Errorf("context expired waiting for daemonset %q: %w; last bad pod: %v", ds.GetName(), ctx.Err(), lastBadPod)
case <-time.After(100 * time.Millisecond):
}
}
return nil
}

// StreamDaemonSetLogs streams the contents of a container from the given
Expand Down

0 comments on commit 936bd4b

Please sign in to comment.