Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove batching on unittest thread, use historical data to inform batching #18578

Merged
merged 4 commits into from
Sep 20, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ internal/
.idea
yarn.lock
package-lock.json
.parallelperf.json
157 changes: 120 additions & 37 deletions src/harness/parallel/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ if (typeof describe === "undefined") {
namespace Harness.Parallel.Host {

interface ChildProcessPartial {
send(message: any, callback?: (error: Error) => void): boolean;
send(message: ParallelHostMessage, callback?: (error: Error) => void): boolean;
on(event: "error", listener: (err: Error) => void): this;
on(event: "exit", listener: (code: number, signal: string) => void): this;
on(event: "message", listener: (message: any) => void): this;
disconnect(): void;
on(event: "message", listener: (message: ParallelClientMessage) => void): this;
}

interface ProgressBarsOptions {
Expand All @@ -27,23 +26,54 @@ namespace Harness.Parallel.Host {
text?: string;
}

const perfdataFileName = ".parallelperf.json";
function readSavedPerfData(): {[testHash: string]: number} {
const perfDataContents = Harness.IO.readFile(perfdataFileName);
if (perfDataContents) {
return JSON.parse(perfDataContents);
}
return undefined;
}

function hashName(runner: TestRunnerKind, test: string) {
return `tsrunner-${runner}://${test}`;
}

export function start() {
initializeProgressBarsDependencies();
console.log("Discovering tests...");
const discoverStart = +(new Date());
const { statSync }: { statSync(path: string): { size: number }; } = require("fs");
const tasks: { runner: TestRunnerKind, file: string, size: number }[] = [];
let totalSize = 0;
const perfData = readSavedPerfData();
let totalCost = 0;
let unknownValue: string | undefined;
for (const runner of runners) {
const files = runner.enumerateTestFiles();
for (const file of files) {
const size = statSync(file).size;
let size: number;
if (!perfData) {
size = statSync(file).size;

}
else {
const hashedName = hashName(runner.kind(), file);
size = perfData[hashedName];
if (size === undefined) {
size = Number.MAX_SAFE_INTEGER;
unknownValue = hashedName;
}
}
tasks.push({ runner: runner.kind(), file, size });
totalSize += size;
totalCost += size;
}
}
tasks.sort((a, b) => a.size - b.size);
const batchSize = (totalSize / workerCount) * 0.9;
// 1 fewer batches than threads to account for unittests running on the final thread
const batchCount = runners.length === 1 ? workerCount : workerCount - 1;
const packfraction = 0.9;
const chunkSize = 1000; // ~1KB or 1s for sending batches near the end of a test
const batchSize = (totalCost / workerCount) * packfraction; // Keep spare tests for unittest thread in reserve
console.log(`Discovered ${tasks.length} test files in ${+(new Date()) - discoverStart}ms.`);
console.log(`Starting to run tests using ${workerCount} threads...`);
const { fork }: { fork(modulePath: string, args?: string[], options?: {}): ChildProcessPartial; } = require("child_process");
Expand All @@ -59,15 +89,17 @@ namespace Harness.Parallel.Host {
const progressUpdateInterval = 1 / progressBars._options.width;
let nextProgress = progressUpdateInterval;

const newPerfData: {[testHash: string]: number} = {};

const workers: ChildProcessPartial[] = [];
let closedWorkers = 0;
for (let i = 0; i < workerCount; i++) {
// TODO: Just send the config over the IPC channel or in the command line arguments
const config: TestConfig = { light: Harness.lightMode, listenForWork: true, runUnitTests: runners.length === 1 ? false : i === workerCount - 1 };
const configPath = ts.combinePaths(taskConfigsFolder, `task-config${i}.json`);
Harness.IO.writeFile(configPath, JSON.stringify(config));
const child = fork(__filename, [`--config="${configPath}"`]);
child.on("error", err => {
child.disconnect();
console.error("Unexpected error in child process:");
console.error(err);
return process.exit(2);
Expand All @@ -81,7 +113,6 @@ namespace Harness.Parallel.Host {
child.on("message", (data: ParallelClientMessage) => {
switch (data.type) {
case "error": {
child.disconnect();
console.error(`Test worker encounted unexpected error and was forced to close:
Message: ${data.payload.error}
Stack: ${data.payload.stack}`);
Expand All @@ -97,6 +128,7 @@ namespace Harness.Parallel.Host {
else {
passingFiles++;
}
newPerfData[hashName(data.payload.runner, data.payload.file)] = data.payload.duration;

const progress = (failingFiles + passingFiles) / totalFiles;
if (progress >= nextProgress) {
Expand All @@ -106,20 +138,27 @@ namespace Harness.Parallel.Host {
updateProgress(progress, errorResults.length ? `${errorResults.length} failing` : `${totalPassing} passing`, errorResults.length ? "fail" : undefined);
}

if (failingFiles + passingFiles === totalFiles) {
// Done. Finished every task and collected results.
child.send({ type: "close" });
child.disconnect();
return outputFinalResult();
}
if (tasks.length === 0) {
// No more tasks to distribute
child.send({ type: "close" });
child.disconnect();
return;
}
if (data.type === "result") {
child.send({ type: "test", payload: tasks.pop() });
if (tasks.length === 0) {
// No more tasks to distribute
child.send({ type: "close" });
closedWorkers++;
if (closedWorkers === workerCount) {
outputFinalResult();
}
return;
}
// Send tasks in blocks if the tasks are small
const taskList = [tasks.pop()];
while (tasks.length && taskList.reduce((p, c) => p + c.size, 0) > chunkSize) {
taskList.push(tasks.pop());
}
if (taskList.length === 1) {
child.send({ type: "test", payload: taskList[0] });
}
else {
child.send({ type: "batch", payload: taskList });
}
}
}
}
Expand All @@ -130,12 +169,13 @@ namespace Harness.Parallel.Host {
// It's only really worth doing an initial batching if there are a ton of files to go through
if (totalFiles > 1000) {
console.log("Batching initial test lists...");
const batches: { runner: TestRunnerKind, file: string, size: number }[][] = new Array(workerCount);
const doneBatching = new Array(workerCount);
const batches: { runner: TestRunnerKind, file: string, size: number }[][] = new Array(batchCount);
const doneBatching = new Array(batchCount);
let scheduledTotal = 0;
batcher: while (true) {
for (let i = 0; i < workerCount; i++) {
for (let i = 0; i < batchCount; i++) {
if (tasks.length === 0) {
// TODO: This indicates a particularly suboptimal packing
console.log(`Suboptimal packing detcted: no tests remain to be stolen. Reduce packing fraction from ${packfraction} to fix.`);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo:detected

break batcher;
}
if (doneBatching[i]) {
Expand All @@ -145,26 +185,36 @@ namespace Harness.Parallel.Host {
batches[i] = [];
}
const total = batches[i].reduce((p, c) => p + c.size, 0);
if (total >= batchSize && !doneBatching[i]) {
if (total >= batchSize) {
doneBatching[i] = true;
continue;
}
batches[i].push(tasks.pop());
const task = tasks.pop();
batches[i].push(task);
scheduledTotal += task.size;
}
for (let j = 0; j < workerCount; j++) {
for (let j = 0; j < batchCount; j++) {
if (!doneBatching[j]) {
continue;
continue batcher;
}
}
break;
}
console.log(`Batched into ${workerCount} groups with approximate total file sizes of ${Math.floor(batchSize)} bytes in each group.`);
const prefix = `Batched into ${batchCount} groups`;
if (unknownValue) {
console.log(`${prefix}. Unprofiled tests including ${unknownValue} will be run first.`);
}
else {
console.log(`${prefix} with approximate total ${perfData ? "time" : "file sizes"} of ${perfData ? ms(batchSize) : `${Math.floor(batchSize)} bytes`} in each group. (${(scheduledTotal / totalCost * 100).toFixed(1)}% of total tests batched)`);
}
for (const worker of workers) {
const action: ParallelBatchMessage = { type: "batch", payload: batches.pop() };
if (!action.payload[0]) {
throw new Error(`Tried to send invalid message ${action}`);
const payload = batches.pop();
if (payload) {
worker.send({ type: "batch", payload });
}
else { // Unittest thread - send off just one test
worker.send({ type: "test", payload: tasks.pop() });
}
worker.send(action);
}
}
else {
Expand All @@ -177,7 +227,6 @@ namespace Harness.Parallel.Host {
updateProgress(0);
let duration: number;

const ms = require("mocha/lib/ms");
function completeBar() {
const isPartitionFail = failingFiles !== 0;
const summaryColor = isPartitionFail ? "fail" : "green";
Expand Down Expand Up @@ -235,6 +284,8 @@ namespace Harness.Parallel.Host {
reporter.epilogue();
}

Harness.IO.writeFile(perfdataFileName, JSON.stringify(newPerfData, null, 4)); // tslint:disable-line:no-null-keyword

process.exit(errorResults.length);
}

Expand Down Expand Up @@ -264,6 +315,38 @@ namespace Harness.Parallel.Host {
let tty: { isatty(x: number): boolean };
let isatty: boolean;

const s = 1000;
const m = s * 60;
const h = m * 60;
const d = h * 24;
function ms(ms: number) {
let result = "";
if (ms >= d) {
const count = Math.floor(ms / d);
result += count + "d";
ms -= count * d;
}
if (ms >= h) {
const count = Math.floor(ms / h);
result += count + "h";
ms -= count * h;
}
if (ms >= m) {
const count = Math.floor(ms / m);
result += count + "m";
ms -= count * m;
}
if (ms >= s) {
const count = Math.round(ms / s);
result += count + "s";
return result;
}
if (ms > 0) {
result += Math.round(ms) + "ms";
}
return result;
}

function initializeProgressBarsDependencies() {
Mocha = require("mocha");
Base = Mocha.reporters.Base;
Expand All @@ -286,7 +369,7 @@ namespace Harness.Parallel.Host {
const close = options.close || "]";
const complete = options.complete || "▬";
const incomplete = options.incomplete || Base.symbols.dot;
const maxWidth = Base.window.width - open.length - close.length - 30;
const maxWidth = Base.window.width - open.length - close.length - 34;
const width = minMax(options.width || maxWidth, 10, maxWidth);
this._options = {
open,
Expand Down
2 changes: 1 addition & 1 deletion src/harness/parallel/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Harness.Parallel {

export type ParallelErrorMessage = { type: "error", payload: { error: string, stack: string } } | never;
export type ErrorInfo = ParallelErrorMessage["payload"] & { name: string };
export type ParallelResultMessage = { type: "result", payload: { passing: number, errors: ErrorInfo[] } } | never;
export type ParallelResultMessage = { type: "result", payload: { passing: number, errors: ErrorInfo[], duration: number, runner: TestRunnerKind, file: string } } | never;
export type ParallelBatchProgressMessage = { type: "progress", payload: ParallelResultMessage["payload"] } | never;
export type ParallelClientMessage = ParallelErrorMessage | ParallelResultMessage | ParallelBatchProgressMessage;
}
13 changes: 10 additions & 3 deletions src/harness/parallel/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ namespace Harness.Parallel.Worker {
testList.length = 0;
}
reportedUnitTests = true;
const start = +(new Date());
runner.initializeTests();
testList.forEach(({ name, callback, kind }) => executeCallback(name, callback, kind));
return { errors, passing };
return { errors, passing, duration: +(new Date()) - start };
}


Expand Down Expand Up @@ -172,7 +173,13 @@ namespace Harness.Parallel.Worker {
});
process.on("uncaughtException", error => {
const message: ParallelErrorMessage = { type: "error", payload: { error: error.message, stack: error.stack } };
process.send(message);
try {
process.send(message);
}
catch (e) {
console.error(error);
throw error;
}
});
if (!runUnitTests) {
// ensure unit tests do not get run
Expand All @@ -189,7 +196,7 @@ namespace Harness.Parallel.Worker {
}
const instance = runners.get(runner);
instance.tests = [file];
return resetShimHarnessAndExecute(instance);
return { ...resetShimHarnessAndExecute(instance), runner, file };
}
}
}