-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
default-tasks-runner.ts
121 lines (108 loc) · 3.12 KB
/
default-tasks-runner.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import { Observable } from 'rxjs';
import {
AffectedEventType,
Task,
TaskCompleteEvent,
TasksRunner,
} from './tasks-runner';
import { ProjectGraph } from '../core/project-graph';
import { NxJson } from '../core/shared-interfaces';
import { TaskOrderer } from './task-orderer';
import { TaskOrchestrator } from './task-orchestrator';
export interface RemoteCache {
retrieve: (hash: string, cacheDirectory: string) => Promise<boolean>;
store: (hash: string, cacheDirectory: string) => Promise<boolean>;
}
export interface LifeCycle {
startTask(task: Task): void;
endTask(task: Task, code: number): void;
}
class NoopLifeCycle implements LifeCycle {
startTask(task: Task): void {}
endTask(task: Task, code: number): void {}
}
export interface DefaultTasksRunnerOptions {
parallel?: boolean;
maxParallel?: number;
cacheableOperations?: string[];
cacheableTargets?: string[];
runtimeCacheInputs?: string[];
strictlyOrderedTargets?: string[];
cacheDirectory?: string;
remoteCache?: RemoteCache;
lifeCycle?: LifeCycle;
captureStderr?: boolean;
skipNxCache?: boolean;
}
export const defaultTasksRunner: TasksRunner<DefaultTasksRunnerOptions> = (
tasks: Task[],
options: DefaultTasksRunnerOptions,
context: {
target: string;
initiatingProject?: string;
projectGraph: ProjectGraph;
nxJson: NxJson;
}
): Observable<TaskCompleteEvent> => {
if (!options.lifeCycle) {
options.lifeCycle = new NoopLifeCycle();
}
return new Observable((subscriber) => {
runAllTasks(tasks, options, context)
.then((data) => data.forEach((d) => subscriber.next(d)))
.catch((e) => {
console.error('Unexpected error:');
console.error(e);
process.exit(1);
})
.finally(() => {
subscriber.complete();
// fix for https://github.com/nrwl/nx/issues/1666
if (process.stdin['unref']) (process.stdin as any).unref();
});
});
};
async function runAllTasks(
tasks: Task[],
options: DefaultTasksRunnerOptions,
context: {
target: string;
initiatingProject?: string;
projectGraph: ProjectGraph;
nxJson: NxJson;
}
): Promise<Array<{ task: Task; type: any; success: boolean }>> {
const stages = new TaskOrderer(
options,
context.target,
context.projectGraph
).splitTasksIntoStages(tasks);
const orchestrator = new TaskOrchestrator(
context.initiatingProject,
context.projectGraph,
options
);
const res = [];
for (let i = 0; i < stages.length; ++i) {
const tasksInStage = stages[i];
const statuses = await orchestrator.run(tasksInStage);
res.push(...statuses);
// any task failed, we need to skip further stages
if (statuses.find((s) => !s.success)) {
res.push(...markStagesAsNotSuccessful(stages.splice(i + 1)));
return res;
}
}
return res;
}
function markStagesAsNotSuccessful(stages: Task[][]) {
return stages.reduce((m, c) => [...m, ...tasksToStatuses(c, false)], []);
}
function tasksToStatuses(tasks: Task[], success: boolean) {
return tasks.map((task) => ({
task,
type: AffectedEventType.TaskComplete,
success,
}));
}
export default defaultTasksRunner;