Skip to content

Commit

Permalink
Split a project into multiple chunks and parallelize
Browse files Browse the repository at this point in the history
Reviewed By: evanyeung

Differential Revision: D56339821

fbshipit-source-id: 3ee092e5d3b513b35802efd27538a25e0f6778c0
  • Loading branch information
tyao1 authored and facebook-github-bot committed May 2, 2024
1 parent 3a24702 commit f7b030e
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 16 deletions.
52 changes: 52 additions & 0 deletions compiler/crates/dependency-analyzer/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,55 @@ fn add_descendants(
}
}
}

/// Get fragment references of each definition
pub fn get_ir_definition_references<'a>(
schema: &SDLSchema,
definitions: impl IntoIterator<Item = &'a ExecutableDefinition>,
) -> ExecutableDefinitionNameMap<ExecutableDefinitionNameSet> {
let mut result: ExecutableDefinitionNameMap<ExecutableDefinitionNameSet> = Default::default();
for definition in definitions {
let name = definition.name_with_location().item;
let name = match definition {
ExecutableDefinition::Operation(_) => OperationDefinitionName(name).into(),
ExecutableDefinition::Fragment(_) => FragmentDefinitionName(name).into(),
};
let mut selections: Vec<_> = match definition {
ExecutableDefinition::Operation(definition) => &definition.selections,
ExecutableDefinition::Fragment(definition) => &definition.selections,
}
.iter()
.collect();
let mut references: ExecutableDefinitionNameSet = Default::default();
while let Some(selection) = selections.pop() {
match selection {
Selection::FragmentSpread(selection) => {
references.insert(selection.fragment.item.into());
}
Selection::LinkedField(selection) => {
if let Some(fragment_name) = get_resolver_fragment_dependency_name(
schema.field(selection.definition.item),
) {
references.insert(fragment_name.into());
}
selections.extend(&selection.selections);
}
Selection::InlineFragment(selection) => {
selections.extend(&selection.selections);
}
Selection::Condition(selection) => {
selections.extend(&selection.selections);
}
Selection::ScalarField(selection) => {
if let Some(fragment_name) = get_resolver_fragment_dependency_name(
schema.field(selection.definition.item),
) {
references.insert(fragment_name.into());
}
}
}
}
result.insert(name, references);
}
result
}
1 change: 1 addition & 0 deletions compiler/crates/dependency-analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod schema_change_analyzer;
pub use ast::get_definition_references;
pub use ast::get_reachable_ast;
pub use ast::ReachableAst;
pub use ir::get_ir_definition_references;
pub use ir::get_reachable_ir;
pub use ir::ExecutableDefinitionNameMap;
pub use ir::ExecutableDefinitionNameSet;
Expand Down
1 change: 1 addition & 0 deletions compiler/crates/relay-compiler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ lazy_static = "1.4"
log = { version = "0.4.17", features = ["kv_unstable", "kv_unstable_std"] }
md-5 = "0.10"
persist-query = { path = "../persist-query" }
petgraph = { version = "0.6.3", features = ["serde-1"] }
rayon = "1.2"
regex = "1.9.2"
relay-codegen = { path = "../relay-codegen" }
Expand Down
165 changes: 149 additions & 16 deletions compiler/crates/relay-compiler/src/build_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,23 @@ use common::PerfLogger;
use common::WithDiagnostics;
use dashmap::mapref::entry::Entry;
use dashmap::DashSet;
use dependency_analyzer::get_ir_definition_references;
use fnv::FnvBuildHasher;
use fnv::FnvHashMap;
use fnv::FnvHashSet;
pub use generate_artifacts::generate_artifacts;
pub use generate_artifacts::generate_preloadable_query_parameters_artifact;
pub use generate_artifacts::Artifact;
pub use generate_artifacts::ArtifactContent;
use graphql_ir::ExecutableDefinition;
use graphql_ir::ExecutableDefinitionName;
use graphql_ir::FragmentDefinitionNameSet;
use graphql_ir::Program;
use indexmap::IndexSet;
use log::debug;
use log::info;
use log::warn;
use petgraph::unionfind::UnionFind;
use rayon::iter::IntoParallelRefIterator;
use rayon::slice::ParallelSlice;
use relay_codegen::Printer;
Expand Down Expand Up @@ -83,7 +88,7 @@ use crate::file_source::SourceControlUpdateStatus;
use crate::graphql_asts::GraphQLAsts;

type BuildProjectOutput = WithDiagnostics<(ProjectName, Arc<SDLSchema>, Programs, Vec<Artifact>)>;
type BuildProgramsOutput = WithDiagnostics<(Programs, Arc<SourceHashes>)>;
type BuildProgramsOutput = WithDiagnostics<(Vec<Programs>, Arc<SourceHashes>)>;

pub enum BuildProjectFailure {
Error(BuildProjectError),
Expand Down Expand Up @@ -141,6 +146,87 @@ pub fn build_raw_program(
Ok((program, source_hashes))
}

const MIN_CHUNK_SIZE: usize = 8192;

/// Build raw programs and divide them into chunks for parallelization
fn build_raw_program_chunks(
project_config: &ProjectConfig,
project_asts: ProjectAsts,
schema: Arc<SDLSchema>,
log_event: &impl PerfLogEvent,
build_mode: BuildMode,
) -> Result<(Vec<Program>, SourceHashes), BuildProjectError> {
// Build a type aware IR.
let BuildIRResult { ir, source_hashes } = log_event.time("build_ir_time", || {
build_ir::build_ir(project_config, project_asts, &schema, build_mode, log_event).map_err(
|errors| BuildProjectError::ValidationErrors {
errors,
project_name: project_config.name,
},
)
})?;

let chunks = if ir.len() < MIN_CHUNK_SIZE {
vec![ir]
} else {
let chunkify_time = log_event.start("chunkify_project_time");
let dependency_map = get_ir_definition_references(&schema, &ir);
let definition_indexes: IndexSet<ExecutableDefinitionName> = ir
.iter()
.map(|def| match def {
ExecutableDefinition::Operation(operation) => {
ExecutableDefinitionName::OperationDefinitionName(operation.name.item)
}
ExecutableDefinition::Fragment(fragment) => {
ExecutableDefinitionName::FragmentDefinitionName(fragment.name.item)
}
})
.collect();

let mut unionfind = UnionFind::<usize>::new(definition_indexes.len());
for (source, destinations) in &dependency_map {
let source_index = definition_indexes.get_index_of(source).unwrap();
for destination in destinations {
let destination_index = definition_indexes.get_index_of(destination).unwrap();
unionfind.union(source_index, destination_index);
}
}

let mut groups = FxHashMap::default();
for (idx, def) in ir.into_iter().enumerate() {
let group = unionfind.find(idx);
groups.entry(group).or_insert_with(Vec::new).push(def);
}

let mut chunks = vec![];
let mut buffer = Vec::new();
for group in groups.into_values() {
if group.len() > MIN_CHUNK_SIZE {
chunks.push(group);
} else {
buffer.extend(group);
if buffer.len() > MIN_CHUNK_SIZE {
chunks.push(std::mem::take(&mut buffer));
}
}
}
if !buffer.is_empty() {
chunks.push(buffer);
}
log_event.stop(chunkify_time);
chunks
};

// Turn the IR into base Programs.
let programs = log_event.time("build_program_time", || {
chunks
.into_iter()
.map(|definitions| Program::from_definitions(Arc::clone(&schema), definitions))
.collect()
});
Ok((programs, source_hashes))
}

pub fn validate_program(
config: &Config,
project_config: &ProjectConfig,
Expand Down Expand Up @@ -270,26 +356,43 @@ pub fn build_programs(
}
},
);
let (program, source_hashes) =
build_raw_program(project_config, project_asts, schema, log_event, build_mode)?;
let (programs, source_hashes) =
build_raw_program_chunks(project_config, project_asts, schema, log_event, build_mode)?;

if compiler_state.should_cancel_current_build() {
debug!("Build is cancelled: updates in source code/or new file changes are pending.");
return Err(BuildProjectFailure::Cancelled);
}
let base_fragment_names = Arc::new(base_fragment_names);
let results: Vec<(Programs, Vec<Diagnostic>)> = programs
.into_par_iter()
.map(|program| {
// Call validation rules that go beyond type checking.
// FIXME: Return non-fatal diagnostics from transforms (only validations for now)
let diagnostics = validate_program(config, project_config, &program, log_event)?;

let programs = transform_program(
project_config,
Arc::new(program),
Arc::clone(&base_fragment_names),
Arc::clone(&perf_logger),
log_event,
config.custom_transforms.as_ref(),
)?;

// Call validation rules that go beyond type checking.
// FIXME: Return non-fatal diagnostics from transforms (only validations for now)
let diagnostics = validate_program(config, project_config, &program, log_event)?;

let programs = transform_program(
project_config,
Arc::new(program),
Arc::new(base_fragment_names),
Arc::clone(&perf_logger),
log_event,
config.custom_transforms.as_ref(),
)?;
Ok((programs, diagnostics))
})
.collect::<Result<Vec<_>, BuildProjectFailure>>()?;

let len = results.len();
let (programs, diagnostics) = results.into_iter().fold(
(Vec::with_capacity(len), vec![]),
|(mut programs, mut diagnostics), (temp_programs, temp_diagnostics)| {
programs.push(temp_programs);
diagnostics.extend(temp_diagnostics);
(programs, diagnostics)
},
);

Ok(WithDiagnostics {
item: (programs, Arc::new(source_hashes)),
Expand Down Expand Up @@ -360,9 +463,19 @@ pub fn build_project(

// Generate artifacts by collecting information from the `Programs`.
let artifacts_timer = log_event.start("generate_artifacts_time");
let artifacts = generate_artifacts(project_config, &programs, Arc::clone(&source_hashes));
let artifacts = programs
.par_iter()
.map(|programs| generate_artifacts(project_config, programs, Arc::clone(&source_hashes)))
.flatten()
.collect();
log_event.stop(artifacts_timer);

let mut iter: std::vec::IntoIter<Programs> = programs.into_iter();
let mut programs = iter.next().expect("Expect at least one result");
for temp_programs in iter {
merge_programs(&mut programs, temp_programs);
}

log_event.number(
"generated_artifacts",
programs.reader.document_count() + programs.normalization.document_count(),
Expand All @@ -376,6 +489,26 @@ pub fn build_project(
})
}

fn merge_programs(onto: &mut Programs, from: Programs) {
merge_program(Arc::get_mut(&mut onto.source).unwrap(), from.source);
merge_program(Arc::get_mut(&mut onto.reader).unwrap(), from.reader);
merge_program(
Arc::get_mut(&mut onto.normalization).unwrap(),
from.normalization,
);
merge_program(
Arc::get_mut(&mut onto.operation_text).unwrap(),
from.operation_text,
);
merge_program(Arc::get_mut(&mut onto.typegen).unwrap(), from.typegen);
}

fn merge_program(onto: &mut Program, from: Arc<Program>) {
let from = Arc::unwrap_or_clone(from);
onto.fragments.extend(from.fragments);
onto.operations.extend(from.operations);
}

#[allow(clippy::too_many_arguments)]
pub async fn commit_project(
config: &Config,
Expand Down

0 comments on commit f7b030e

Please sign in to comment.