Skip to content

Commit

Permalink
Remove batching on unittest thread, use historical data to inform bat…
Browse files Browse the repository at this point in the history
…ching (#18578)

* Remove batching on unittest thread

* Batch more things, improve output, use past test perf as a better heuristic for future test runs

* Fix merge sideeffect

* Fix typo
  • Loading branch information
weswigham authored Sep 20, 2017
1 parent b9b1127 commit 7dec4ae
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 41 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ internal/
.idea
yarn.lock
package-lock.json
.parallelperf.json
157 changes: 120 additions & 37 deletions src/harness/parallel/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ if (typeof describe === "undefined") {
namespace Harness.Parallel.Host {

interface ChildProcessPartial {
send(message: any, callback?: (error: Error) => void): boolean;
send(message: ParallelHostMessage, callback?: (error: Error) => void): boolean;
on(event: "error", listener: (err: Error) => void): this;
on(event: "exit", listener: (code: number, signal: string) => void): this;
on(event: "message", listener: (message: any) => void): this;
disconnect(): void;
on(event: "message", listener: (message: ParallelClientMessage) => void): this;
}

interface ProgressBarsOptions {
Expand All @@ -27,23 +26,54 @@ namespace Harness.Parallel.Host {
text?: string;
}

const perfdataFileName = ".parallelperf.json";
function readSavedPerfData(): {[testHash: string]: number} {
const perfDataContents = Harness.IO.readFile(perfdataFileName);
if (perfDataContents) {
return JSON.parse(perfDataContents);
}
return undefined;
}

function hashName(runner: TestRunnerKind, test: string) {
return `tsrunner-${runner}://${test}`;
}

export function start() {
initializeProgressBarsDependencies();
console.log("Discovering tests...");
const discoverStart = +(new Date());
const { statSync }: { statSync(path: string): { size: number }; } = require("fs");
const tasks: { runner: TestRunnerKind, file: string, size: number }[] = [];
let totalSize = 0;
const perfData = readSavedPerfData();
let totalCost = 0;
let unknownValue: string | undefined;
for (const runner of runners) {
const files = runner.enumerateTestFiles();
for (const file of files) {
const size = statSync(file).size;
let size: number;
if (!perfData) {
size = statSync(file).size;

}
else {
const hashedName = hashName(runner.kind(), file);
size = perfData[hashedName];
if (size === undefined) {
size = Number.MAX_SAFE_INTEGER;
unknownValue = hashedName;
}
}
tasks.push({ runner: runner.kind(), file, size });
totalSize += size;
totalCost += size;
}
}
tasks.sort((a, b) => a.size - b.size);
const batchSize = (totalSize / workerCount) * 0.9;
// 1 fewer batches than threads to account for unittests running on the final thread
const batchCount = runners.length === 1 ? workerCount : workerCount - 1;
const packfraction = 0.9;
const chunkSize = 1000; // ~1KB or 1s for sending batches near the end of a test
const batchSize = (totalCost / workerCount) * packfraction; // Keep spare tests for unittest thread in reserve
console.log(`Discovered ${tasks.length} test files in ${+(new Date()) - discoverStart}ms.`);
console.log(`Starting to run tests using ${workerCount} threads...`);
const { fork }: { fork(modulePath: string, args?: string[], options?: {}): ChildProcessPartial; } = require("child_process");
Expand All @@ -59,15 +89,17 @@ namespace Harness.Parallel.Host {
const progressUpdateInterval = 1 / progressBars._options.width;
let nextProgress = progressUpdateInterval;

const newPerfData: {[testHash: string]: number} = {};

const workers: ChildProcessPartial[] = [];
let closedWorkers = 0;
for (let i = 0; i < workerCount; i++) {
// TODO: Just send the config over the IPC channel or in the command line arguments
const config: TestConfig = { light: Harness.lightMode, listenForWork: true, runUnitTests: runners.length === 1 ? false : i === workerCount - 1 };
const configPath = ts.combinePaths(taskConfigsFolder, `task-config${i}.json`);
Harness.IO.writeFile(configPath, JSON.stringify(config));
const child = fork(__filename, [`--config="${configPath}"`]);
child.on("error", err => {
child.disconnect();
console.error("Unexpected error in child process:");
console.error(err);
return process.exit(2);
Expand All @@ -81,7 +113,6 @@ namespace Harness.Parallel.Host {
child.on("message", (data: ParallelClientMessage) => {
switch (data.type) {
case "error": {
child.disconnect();
console.error(`Test worker encounted unexpected error and was forced to close:
Message: ${data.payload.error}
Stack: ${data.payload.stack}`);
Expand All @@ -97,6 +128,7 @@ namespace Harness.Parallel.Host {
else {
passingFiles++;
}
newPerfData[hashName(data.payload.runner, data.payload.file)] = data.payload.duration;

const progress = (failingFiles + passingFiles) / totalFiles;
if (progress >= nextProgress) {
Expand All @@ -106,20 +138,27 @@ namespace Harness.Parallel.Host {
updateProgress(progress, errorResults.length ? `${errorResults.length} failing` : `${totalPassing} passing`, errorResults.length ? "fail" : undefined);
}

if (failingFiles + passingFiles === totalFiles) {
// Done. Finished every task and collected results.
child.send({ type: "close" });
child.disconnect();
return outputFinalResult();
}
if (tasks.length === 0) {
// No more tasks to distribute
child.send({ type: "close" });
child.disconnect();
return;
}
if (data.type === "result") {
child.send({ type: "test", payload: tasks.pop() });
if (tasks.length === 0) {
// No more tasks to distribute
child.send({ type: "close" });
closedWorkers++;
if (closedWorkers === workerCount) {
outputFinalResult();
}
return;
}
// Send tasks in blocks if the tasks are small
const taskList = [tasks.pop()];
while (tasks.length && taskList.reduce((p, c) => p + c.size, 0) > chunkSize) {
taskList.push(tasks.pop());
}
if (taskList.length === 1) {
child.send({ type: "test", payload: taskList[0] });
}
else {
child.send({ type: "batch", payload: taskList });
}
}
}
}
Expand All @@ -130,12 +169,13 @@ namespace Harness.Parallel.Host {
// It's only really worth doing an initial batching if there are a ton of files to go through
if (totalFiles > 1000) {
console.log("Batching initial test lists...");
const batches: { runner: TestRunnerKind, file: string, size: number }[][] = new Array(workerCount);
const doneBatching = new Array(workerCount);
const batches: { runner: TestRunnerKind, file: string, size: number }[][] = new Array(batchCount);
const doneBatching = new Array(batchCount);
let scheduledTotal = 0;
batcher: while (true) {
for (let i = 0; i < workerCount; i++) {
for (let i = 0; i < batchCount; i++) {
if (tasks.length === 0) {
// TODO: This indicates a particularly suboptimal packing
console.log(`Suboptimal packing detected: no tests remain to be stolen. Reduce packing fraction from ${packfraction} to fix.`);
break batcher;
}
if (doneBatching[i]) {
Expand All @@ -145,26 +185,36 @@ namespace Harness.Parallel.Host {
batches[i] = [];
}
const total = batches[i].reduce((p, c) => p + c.size, 0);
if (total >= batchSize && !doneBatching[i]) {
if (total >= batchSize) {
doneBatching[i] = true;
continue;
}
batches[i].push(tasks.pop());
const task = tasks.pop();
batches[i].push(task);
scheduledTotal += task.size;
}
for (let j = 0; j < workerCount; j++) {
for (let j = 0; j < batchCount; j++) {
if (!doneBatching[j]) {
continue;
continue batcher;
}
}
break;
}
console.log(`Batched into ${workerCount} groups with approximate total file sizes of ${Math.floor(batchSize)} bytes in each group.`);
const prefix = `Batched into ${batchCount} groups`;
if (unknownValue) {
console.log(`${prefix}. Unprofiled tests including ${unknownValue} will be run first.`);
}
else {
console.log(`${prefix} with approximate total ${perfData ? "time" : "file sizes"} of ${perfData ? ms(batchSize) : `${Math.floor(batchSize)} bytes`} in each group. (${(scheduledTotal / totalCost * 100).toFixed(1)}% of total tests batched)`);
}
for (const worker of workers) {
const action: ParallelBatchMessage = { type: "batch", payload: batches.pop() };
if (!action.payload[0]) {
throw new Error(`Tried to send invalid message ${action}`);
const payload = batches.pop();
if (payload) {
worker.send({ type: "batch", payload });
}
else { // Unittest thread - send off just one test
worker.send({ type: "test", payload: tasks.pop() });
}
worker.send(action);
}
}
else {
Expand All @@ -177,7 +227,6 @@ namespace Harness.Parallel.Host {
updateProgress(0);
let duration: number;

const ms = require("mocha/lib/ms");
function completeBar() {
const isPartitionFail = failingFiles !== 0;
const summaryColor = isPartitionFail ? "fail" : "green";
Expand Down Expand Up @@ -235,6 +284,8 @@ namespace Harness.Parallel.Host {
reporter.epilogue();
}

Harness.IO.writeFile(perfdataFileName, JSON.stringify(newPerfData, null, 4)); // tslint:disable-line:no-null-keyword

process.exit(errorResults.length);
}

Expand Down Expand Up @@ -264,6 +315,38 @@ namespace Harness.Parallel.Host {
let tty: { isatty(x: number): boolean };
let isatty: boolean;

const s = 1000;
const m = s * 60;
const h = m * 60;
const d = h * 24;
function ms(ms: number) {
let result = "";
if (ms >= d) {
const count = Math.floor(ms / d);
result += count + "d";
ms -= count * d;
}
if (ms >= h) {
const count = Math.floor(ms / h);
result += count + "h";
ms -= count * h;
}
if (ms >= m) {
const count = Math.floor(ms / m);
result += count + "m";
ms -= count * m;
}
if (ms >= s) {
const count = Math.round(ms / s);
result += count + "s";
return result;
}
if (ms > 0) {
result += Math.round(ms) + "ms";
}
return result;
}

function initializeProgressBarsDependencies() {
Mocha = require("mocha");
Base = Mocha.reporters.Base;
Expand All @@ -286,7 +369,7 @@ namespace Harness.Parallel.Host {
const close = options.close || "]";
const complete = options.complete || "▬";
const incomplete = options.incomplete || Base.symbols.dot;
const maxWidth = Base.window.width - open.length - close.length - 30;
const maxWidth = Base.window.width - open.length - close.length - 34;
const width = minMax(options.width || maxWidth, 10, maxWidth);
this._options = {
open,
Expand Down
2 changes: 1 addition & 1 deletion src/harness/parallel/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Harness.Parallel {

export type ParallelErrorMessage = { type: "error", payload: { error: string, stack: string } } | never;
export type ErrorInfo = ParallelErrorMessage["payload"] & { name: string };
export type ParallelResultMessage = { type: "result", payload: { passing: number, errors: ErrorInfo[] } } | never;
export type ParallelResultMessage = { type: "result", payload: { passing: number, errors: ErrorInfo[], duration: number, runner: TestRunnerKind, file: string } } | never;
export type ParallelBatchProgressMessage = { type: "progress", payload: ParallelResultMessage["payload"] } | never;
export type ParallelClientMessage = ParallelErrorMessage | ParallelResultMessage | ParallelBatchProgressMessage;
}
13 changes: 10 additions & 3 deletions src/harness/parallel/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ namespace Harness.Parallel.Worker {
testList.length = 0;
}
reportedUnitTests = true;
const start = +(new Date());
runner.initializeTests();
testList.forEach(({ name, callback, kind }) => executeCallback(name, callback, kind));
return { errors, passing };
return { errors, passing, duration: +(new Date()) - start };
}


Expand Down Expand Up @@ -172,7 +173,13 @@ namespace Harness.Parallel.Worker {
});
process.on("uncaughtException", error => {
const message: ParallelErrorMessage = { type: "error", payload: { error: error.message, stack: error.stack } };
process.send(message);
try {
process.send(message);
}
catch (e) {
console.error(error);
throw error;
}
});
if (!runUnitTests) {
// ensure unit tests do not get run
Expand All @@ -189,7 +196,7 @@ namespace Harness.Parallel.Worker {
}
const instance = runners.get(runner);
instance.tests = [file];
return resetShimHarnessAndExecute(instance);
return { ...resetShimHarnessAndExecute(instance), runner, file };
}
}
}

0 comments on commit 7dec4ae

Please sign in to comment.