Skip to content

Commit

Permalink
eks/cluster-loader: fix error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
  • Loading branch information
gyuho committed Jun 20, 2020
1 parent 67f60d0 commit 4a9d982
Showing 1 changed file with 130 additions and 85 deletions.
215 changes: 130 additions & 85 deletions eks/cluster-loader/cluster-loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,51 +106,67 @@ func New(cfg Config) Loader {
}
}

/*
DO NOT FAIL THE TEST JUST BECAUSE IT CANNOT GET METRICS
I0620 10:48:09.278149 256 simple_test_executor.go:384] Resources cleanup time: 15.009539312s
E0620 10:48:09.278189 256 clusterloader.go:219] --------------------------------------------------------------------------------
E0620 10:48:09.278193 256 clusterloader.go:220] Test Finished
E0620 10:48:09.278196 256 clusterloader.go:221] Test: /clusterloader2-test-config.yaml
E0620 10:48:09.278199 256 clusterloader.go:222] Status: Fail
E0620 10:48:09.278202 256 clusterloader.go:224] Errors: [measurement call TestMetrics - TestMetrics error: [action start failed for SchedulingMetrics measurement: unexpected error (code: 0) in ssh connection to master: &errors.errorString{s:"error getting signer for provider : 'GetSigner(...) not implemented for '"}]
measurement call TestMetrics - TestMetrics error: [action gather failed for SchedulingMetrics measurement: unexpected error (code: 0) in ssh connection to master: &errors.errorString{s:"error getting signer for provider : 'GetSigner(...) not implemented for '"}]]
E0620 10:48:09.278206 256 clusterloader.go:226] --------------------------------------------------------------------------------
JUnit report was created: /data/eks-2020062010-exclusiver66-cluster-loader-local-report/junit.xml
F0620 10:48:09.278371 256 clusterloader.go:329] 1 tests have failed!
*/

// Start runs the cluster loader and waits for its completion.
func (ld *loader) Start() (err error) {
ld.cfg.Logger.Info("starting cluster loader")
func (ts *loader) Start() (err error) {
ts.cfg.Logger.Info("starting cluster loader")

if !fileutil.Exist(ld.cfg.TestConfigPath) {
ld.cfg.Logger.Warn("clusterloader test config file does not exist", zap.String("path", ld.cfg.TestConfigPath))
return fmt.Errorf("%q not found", ld.cfg.TestConfigPath)
if !fileutil.Exist(ts.cfg.TestConfigPath) {
ts.cfg.Logger.Warn("clusterloader test config file does not exist", zap.String("path", ts.cfg.TestConfigPath))
return fmt.Errorf("%q not found", ts.cfg.TestConfigPath)
}

if err = os.MkdirAll(ld.cfg.ReportDir, 0700); err != nil {
if err = os.MkdirAll(ts.cfg.ReportDir, 0700); err != nil {
return err
}
if err = fileutil.IsDirWriteable(ld.cfg.ReportDir); err != nil {
if err = fileutil.IsDirWriteable(ts.cfg.ReportDir); err != nil {
return err
}

if err = ld.downloadClusterLoader(); err != nil {
if err = ts.downloadClusterLoader(); err != nil {
return err
}
if err = ld.writeTestOverrides(); err != nil {
if err = ts.writeTestOverrides(); err != nil {
return err
}

args := []string{
ld.cfg.ClusterLoaderPath,
ts.cfg.ClusterLoaderPath,
"--alsologtostderr",
"--testconfig=" + ld.cfg.TestConfigPath,
"--testoverrides=" + ld.testOverridesPath,
"--report-dir=" + ld.cfg.ReportDir,
"--nodes=" + fmt.Sprintf("%d", ld.cfg.Nodes),
"--testconfig=" + ts.cfg.TestConfigPath,
"--testoverrides=" + ts.testOverridesPath,
"--report-dir=" + ts.cfg.ReportDir,
"--nodes=" + fmt.Sprintf("%d", ts.cfg.Nodes),
}
if ld.cfg.KubeConfigPath == "" {
if ts.cfg.KubeConfigPath == "" {
// ref. https://github.com/kubernetes/perf-tests/pull/1295
args = append(args, "--run-from-cluster=true")
} else {
args = append(args, "--kubeconfig="+ld.cfg.KubeConfigPath)
args = append(args, "--kubeconfig="+ts.cfg.KubeConfigPath)
}

ld.testLogsFile, err = os.OpenFile(ld.cfg.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
ts.testLogsFile, err = os.OpenFile(ts.cfg.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return err
}
defer func() {
ld.testLogsFile.Sync()
ld.testLogsFile.Close()
ts.testLogsFile.Sync()
ts.testLogsFile.Close()
}()
// stream command run outputs for debugging purposes
checkDonec := make(chan struct{})
Expand All @@ -160,122 +176,140 @@ func (ld *loader) Start() (err error) {
}()
for {
select {
case <-ld.cfg.Stopc:
ld.cfg.Logger.Info("exiting cluster loader command output checks")
case <-ts.cfg.Stopc:
ts.cfg.Logger.Info("exiting cluster loader command output checks")
return
case <-ld.rootCtx.Done():
ld.cfg.Logger.Info("exiting cluster loader command output checks")
case <-ts.rootCtx.Done():
ts.cfg.Logger.Info("exiting cluster loader command output checks")
return
case <-time.After(10 * time.Second):
}

if ld.testLogsFile != nil {
ld.testLogsFile.Sync()
if ts.testLogsFile != nil {
ts.testLogsFile.Sync()
}
ld.cfg.Logger.Info("checking cluster loader command output from logs file")
b, lerr := ioutil.ReadFile(ld.cfg.LogPath)
if err != nil {
ld.cfg.Logger.Warn("failed to read cluster loader command output from logs file", zap.Error(lerr))
ts.cfg.Logger.Info("checking cluster loader command output from logs file")
b, lerr := ioutil.ReadFile(ts.cfg.LogPath)
if lerr != nil {
ts.cfg.Logger.Warn("failed to read cluster loader command output from logs file", zap.Error(lerr))
continue
}
output := strings.TrimSpace(string(b))
lines := strings.Split(output, "\n")
linesN := len(lines)

ld.cfg.Logger.Info("checked cluster loader command output from logs file", zap.Int("total-lines", linesN))
ts.cfg.Logger.Info("checked cluster loader command output from logs file", zap.Int("total-lines", linesN))
if linesN > 15 {
output = strings.Join(lines[linesN-15:], "\n")
}
fmt.Printf("\n%q output:\n%s\n\n", ld.cfg.LogPath, output)
fmt.Printf("\n%q output:\n%s\n\n", ts.cfg.LogPath, output)
}
}()

now := time.Now()
errc := make(chan error)
ld.rootCtx, ld.rootCancel = context.WithTimeout(context.Background(), ld.cfg.Timeout)
var runErr error
ts.rootCtx, ts.rootCancel = context.WithTimeout(context.Background(), ts.cfg.Timeout)
go func() {
for i := 0; i < ld.cfg.Runs; i++ {
for i := 0; i < ts.cfg.Runs; i++ {
select {
case <-ld.rootCtx.Done():
case <-ts.rootCtx.Done():
return
default:
}
if err := ld.run(i, args); err != nil {
errc <- err
if rerr := ts.run(i, args); rerr != nil {
errc <- rerr
return
}
}
errc <- nil
}()
select {
case <-ld.donec:
ld.cfg.Logger.Info("done cluster loader")
case <-ld.cfg.Stopc:
ld.cfg.Logger.Info("stopping cluster loader")
case <-ld.rootCtx.Done():
ld.cfg.Logger.Info("timed out cluster loader")
case err = <-errc:
if err == nil {
ld.cfg.Logger.Info("completed cluster loader")
case <-ts.donec:
ts.cfg.Logger.Info("done cluster loader")
case <-ts.cfg.Stopc:
ts.cfg.Logger.Info("stopping cluster loader")
case <-ts.rootCtx.Done():
ts.cfg.Logger.Info("timed out cluster loader")
case runErr = <-errc:
if runErr == nil {
ts.cfg.Logger.Info("successfully ran cluster loader",
zap.String("took", time.Since(now).String()),
zap.Int("total-runs", ts.cfg.Runs),
)
} else {
ld.cfg.Logger.Warn("failed cluster loader", zap.Error(err))
ts.cfg.Logger.Warn("failed to run cluster loader",
zap.String("took", time.Since(now).String()),
zap.Int("total-runs", ts.cfg.Runs),
zap.Error(runErr),
)
}
}
ld.rootCancel()
ts.rootCancel()
select {
case <-checkDonec:
ld.cfg.Logger.Info("confirmed exit cluster loader command output checks")
ts.cfg.Logger.Info("confirmed exit cluster loader command output checks")
case <-time.After(3 * time.Minute):
ld.cfg.Logger.Warn("took too long to confirm exit cluster loader command output checks")
ts.cfg.Logger.Warn("took too long to confirm exit cluster loader command output checks")
}
if err != nil {
ld.cfg.Logger.Warn("failed to run cluster loader", zap.Error(err))
if runErr != nil {
ts.cfg.Logger.Warn("failed to run cluster loader", zap.Error(runErr))
} else {
ld.cfg.Logger.Info("successfully ran cluster loader")
ts.cfg.Logger.Info("successfully ran cluster loader")
}

lout, lerr := ioutil.ReadFile(ts.cfg.LogPath)
if lerr != nil {
ts.cfg.Logger.Warn("failed to read cluster loader log output", zap.Error(lerr))
return lerr
}
logOutput := string(lout)
testFinishedCount := strings.Count(logOutput, `] Test Finished`)

// append results in "LogPath"
// "0777" to fix "scp: /var/log/cluster-loader-remote.log: Permission denied"
logFile, cerr := os.OpenFile(ld.cfg.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0777)
logFile, cerr := os.OpenFile(ts.cfg.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0777)
if cerr != nil {
return fmt.Errorf("open(%q): %v", ld.cfg.LogPath, cerr)
return fmt.Errorf("open(%q): %v", ts.cfg.LogPath, cerr)
}
defer logFile.Close()

podStartupLats := make([]measurement_util.PerfData, 0)
cerr = filepath.Walk(ld.cfg.ReportDir, func(path string, info os.FileInfo, ferr error) error {
cerr = filepath.Walk(ts.cfg.ReportDir, func(path string, info os.FileInfo, ferr error) error {
if ferr != nil {
return ferr
}
if info.IsDir() {
return nil
}
ld.cfg.Logger.Info("found report", zap.String("path", path))
ts.cfg.Logger.Info("found report", zap.String("path", path))

if strings.HasPrefix(filepath.Base(path), "PodStartupLatency_") {
ld.cfg.Logger.Info("parsing PodStartupLatency", zap.String("path", path))
ts.cfg.Logger.Info("parsing PodStartupLatency", zap.String("path", path))
p, perr := ParsePodStartupLatency(path)
if perr != nil {
ld.cfg.Logger.Warn("failed to parse PodStartupLatency", zap.String("path", path))
ts.cfg.Logger.Warn("failed to parse PodStartupLatency", zap.String("path", path))
return perr
}
ld.cfg.Logger.Info("parsed PodStartupLatency", zap.String("path", path))
ts.cfg.Logger.Info("parsed PodStartupLatency", zap.String("path", path))
podStartupLats = append(podStartupLats, p)
}

if _, werr := logFile.WriteString(fmt.Sprintf("\n\n\nreport output from %q:\n\n", path)); werr != nil {
ld.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
ts.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
return nil
}

b, lerr := ioutil.ReadFile(path)
if lerr != nil {
ld.cfg.Logger.Warn("failed to read cluster loader command output from logs file", zap.Error(lerr))
ts.cfg.Logger.Warn("failed to read cluster loader command output from logs file", zap.Error(lerr))
if _, werr := logFile.WriteString(fmt.Sprintf("failed to write %v", lerr)); werr != nil {
ld.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
ts.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
return nil
}
} else {
if _, werr := logFile.Write(b); werr != nil {
ld.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
ts.cfg.Logger.Warn("failed to write report to log file", zap.Error(werr))
return nil
}
}
Expand All @@ -287,35 +321,52 @@ func (ld *loader) Start() (err error) {
podStartupLat := MergePodStartupLatency(podStartupLats...)
podStartupLatData, derr := json.Marshal(podStartupLat)
if derr != nil {
ld.cfg.Logger.Warn("failed to marshal PodStartupLatency", zap.Error(derr))
ts.cfg.Logger.Warn("failed to marshal PodStartupLatency", zap.Error(derr))
return derr
}
cerr = ioutil.WriteFile(ld.cfg.PodStartupLatencyOutputPath, podStartupLatData, 0600)
if cerr != nil {
ld.cfg.Logger.Warn("failed to write PodStartupLatency", zap.Error(cerr))
if cerr = ioutil.WriteFile(ts.cfg.PodStartupLatencyOutputPath, podStartupLatData, 0600); cerr != nil {
ts.cfg.Logger.Warn("failed to write PodStartupLatency", zap.Error(cerr))
return cerr
}

ld.cfg.Logger.Info("gzipping report dir", zap.String("report-dir", ld.cfg.ReportDir), zap.String("file-path", ld.cfg.ReportTarGzPath))
cerr = os.RemoveAll(ld.cfg.ReportTarGzPath)
if cerr != nil {
ld.cfg.Logger.Warn("failed to remove temp file", zap.Error(cerr))
ts.cfg.Logger.Info("gzipping report dir", zap.String("report-dir", ts.cfg.ReportDir), zap.String("file-path", ts.cfg.ReportTarGzPath))
if cerr = os.RemoveAll(ts.cfg.ReportTarGzPath); cerr != nil {
ts.cfg.Logger.Warn("failed to remove temp file", zap.Error(cerr))
return cerr
}
cerr = archiver.Archive([]string{ld.cfg.ReportDir}, ld.cfg.ReportTarGzPath)
if cerr != nil {
ld.cfg.Logger.Warn("archive failed", zap.Error(cerr))
if cerr = archiver.Archive([]string{ts.cfg.ReportDir}, ts.cfg.ReportTarGzPath); cerr != nil {
ts.cfg.Logger.Warn("archive failed", zap.Error(cerr))
return cerr
}
stat, cerr := os.Stat(ld.cfg.ReportTarGzPath)
stat, cerr := os.Stat(ts.cfg.ReportTarGzPath)
if cerr != nil {
ld.cfg.Logger.Warn("failed to os stat", zap.Error(cerr))
ts.cfg.Logger.Warn("failed to os stat", zap.Error(cerr))
return cerr
}
sz := humanize.Bytes(uint64(stat.Size()))
ld.cfg.Logger.Info("gzipped report dir", zap.String("report-dir", ld.cfg.ReportDir), zap.String("file-path", ld.cfg.ReportTarGzPath), zap.String("file-size", sz))

return err
ts.cfg.Logger.Info("gzipped report dir", zap.String("report-dir", ts.cfg.ReportDir), zap.String("file-path", ts.cfg.ReportTarGzPath), zap.String("file-size", sz))

if testFinishedCount == ts.cfg.Runs {
ts.cfg.Logger.Info("completed expected test runs; overriding error",
zap.Int("finished-count", testFinishedCount),
zap.Int("expected-runs", ts.cfg.Runs),
zap.Error(runErr),
)
runErr = nil
} else {
ts.cfg.Logger.Warn("failed to complete expected test runs",
zap.Int("finished-count", testFinishedCount),
zap.Int("expected-runs", ts.cfg.Runs),
zap.Error(runErr),
)
completeErr := fmt.Errorf("failed to complete expected test runs [expected %d, completed %d]", ts.cfg.Runs, testFinishedCount)
if runErr == nil {
runErr = completeErr
} else {
runErr = fmt.Errorf("%v (run error: %v)", completeErr, runErr)
}
}
return runErr
}

func (ld *loader) Stop() {
Expand Down Expand Up @@ -406,18 +457,12 @@ ENABLE_SYSTEM_POD_METRICS: {{ .EnableSystemPodMetrics }}

// takes about 2-minute
func (ld *loader) run(idx int, args []string) (err error) {
now := time.Now()
ld.cfg.Logger.Info("running cluster loader", zap.Int("index", idx), zap.String("command", strings.Join(args, " ")))
ctx, cancel := context.WithTimeout(ld.rootCtx, 20*time.Minute)
cmd := exec.New().CommandContext(ctx, args[0], args[1:]...)
cmd.SetStderr(ld.testLogsFile)
cmd.SetStdout(ld.testLogsFile)
err = cmd.Run()
cancel()
if err != nil {
ld.cfg.Logger.Warn("failed to run cluster loader", zap.String("took", time.Since(now).String()), zap.Error(err))
} else {
ld.cfg.Logger.Info("successfully ran cluster loader", zap.String("took", time.Since(now).String()))
}
return err
}

0 comments on commit 4a9d982

Please sign in to comment.