From f7b030ec69f8fa9854db3e43d8db6184d8558584 Mon Sep 17 00:00:00 2001 From: Tianyu Yao Date: Thu, 2 May 2024 11:18:36 -0700 Subject: [PATCH] Split a project into multiple chunks and parallelize Reviewed By: evanyeung Differential Revision: D56339821 fbshipit-source-id: 3ee092e5d3b513b35802efd27538a25e0f6778c0 --- compiler/crates/dependency-analyzer/src/ir.rs | 52 ++++++ .../crates/dependency-analyzer/src/lib.rs | 1 + compiler/crates/relay-compiler/Cargo.toml | 1 + .../relay-compiler/src/build_project.rs | 165 ++++++++++++++++-- 4 files changed, 203 insertions(+), 16 deletions(-) diff --git a/compiler/crates/dependency-analyzer/src/ir.rs b/compiler/crates/dependency-analyzer/src/ir.rs index 613ddf162728d..70d8780b4ab71 100644 --- a/compiler/crates/dependency-analyzer/src/ir.rs +++ b/compiler/crates/dependency-analyzer/src/ir.rs @@ -305,3 +305,55 @@ fn add_descendants( } } } + +/// Get fragment references of each definition +pub fn get_ir_definition_references<'a>( + schema: &SDLSchema, + definitions: impl IntoIterator, +) -> ExecutableDefinitionNameMap { + let mut result: ExecutableDefinitionNameMap = Default::default(); + for definition in definitions { + let name = definition.name_with_location().item; + let name = match definition { + ExecutableDefinition::Operation(_) => OperationDefinitionName(name).into(), + ExecutableDefinition::Fragment(_) => FragmentDefinitionName(name).into(), + }; + let mut selections: Vec<_> = match definition { + ExecutableDefinition::Operation(definition) => &definition.selections, + ExecutableDefinition::Fragment(definition) => &definition.selections, + } + .iter() + .collect(); + let mut references: ExecutableDefinitionNameSet = Default::default(); + while let Some(selection) = selections.pop() { + match selection { + Selection::FragmentSpread(selection) => { + references.insert(selection.fragment.item.into()); + } + Selection::LinkedField(selection) => { + if let Some(fragment_name) = get_resolver_fragment_dependency_name( + schema.field(selection.definition.item), + ) { + references.insert(fragment_name.into()); + } + selections.extend(&selection.selections); + } + Selection::InlineFragment(selection) => { + selections.extend(&selection.selections); + } + Selection::Condition(selection) => { + selections.extend(&selection.selections); + } + Selection::ScalarField(selection) => { + if let Some(fragment_name) = get_resolver_fragment_dependency_name( + schema.field(selection.definition.item), + ) { + references.insert(fragment_name.into()); + } + } + } + } + result.insert(name, references); + } + result +} diff --git a/compiler/crates/dependency-analyzer/src/lib.rs b/compiler/crates/dependency-analyzer/src/lib.rs index 834267797c29c..f34d5b28d220a 100644 --- a/compiler/crates/dependency-analyzer/src/lib.rs +++ b/compiler/crates/dependency-analyzer/src/lib.rs @@ -18,6 +18,7 @@ mod schema_change_analyzer; pub use ast::get_definition_references; pub use ast::get_reachable_ast; pub use ast::ReachableAst; +pub use ir::get_ir_definition_references; pub use ir::get_reachable_ir; pub use ir::ExecutableDefinitionNameMap; pub use ir::ExecutableDefinitionNameSet; diff --git a/compiler/crates/relay-compiler/Cargo.toml b/compiler/crates/relay-compiler/Cargo.toml index dd0f32b4beafe..88abaebbaaffe 100644 --- a/compiler/crates/relay-compiler/Cargo.toml +++ b/compiler/crates/relay-compiler/Cargo.toml @@ -48,6 +48,7 @@ lazy_static = "1.4" log = { version = "0.4.17", features = ["kv_unstable", "kv_unstable_std"] } md-5 = "0.10" persist-query = { path = "../persist-query" } +petgraph = { version = "0.6.3", features = ["serde-1"] } rayon = "1.2" regex = "1.9.2" relay-codegen = { path = "../relay-codegen" } diff --git a/compiler/crates/relay-compiler/src/build_project.rs b/compiler/crates/relay-compiler/src/build_project.rs index 1d1c66cfdf2c7..354d7c553bf0f 100644 --- a/compiler/crates/relay-compiler/src/build_project.rs +++ b/compiler/crates/relay-compiler/src/build_project.rs @@ -37,6 +37,7 @@ use common::PerfLogger; use common::WithDiagnostics; use dashmap::mapref::entry::Entry; use dashmap::DashSet; +use dependency_analyzer::get_ir_definition_references; use fnv::FnvBuildHasher; use fnv::FnvHashMap; use fnv::FnvHashSet; @@ -44,11 +45,15 @@ pub use generate_artifacts::generate_artifacts; pub use generate_artifacts::generate_preloadable_query_parameters_artifact; pub use generate_artifacts::Artifact; pub use generate_artifacts::ArtifactContent; +use graphql_ir::ExecutableDefinition; +use graphql_ir::ExecutableDefinitionName; use graphql_ir::FragmentDefinitionNameSet; use graphql_ir::Program; +use indexmap::IndexSet; use log::debug; use log::info; use log::warn; +use petgraph::unionfind::UnionFind; use rayon::iter::IntoParallelRefIterator; use rayon::slice::ParallelSlice; use relay_codegen::Printer; @@ -83,7 +88,7 @@ use crate::file_source::SourceControlUpdateStatus; use crate::graphql_asts::GraphQLAsts; type BuildProjectOutput = WithDiagnostics<(ProjectName, Arc, Programs, Vec)>; -type BuildProgramsOutput = WithDiagnostics<(Programs, Arc)>; +type BuildProgramsOutput = WithDiagnostics<(Vec, Arc)>; pub enum BuildProjectFailure { Error(BuildProjectError), @@ -141,6 +146,87 @@ pub fn build_raw_program( Ok((program, source_hashes)) } +const MIN_CHUNK_SIZE: usize = 8192; + +/// Build raw programs and divide them into chunks for parallelization +fn build_raw_program_chunks( + project_config: &ProjectConfig, + project_asts: ProjectAsts, + schema: Arc, + log_event: &impl PerfLogEvent, + build_mode: BuildMode, +) -> Result<(Vec, SourceHashes), BuildProjectError> { + // Build a type aware IR. + let BuildIRResult { ir, source_hashes } = log_event.time("build_ir_time", || { + build_ir::build_ir(project_config, project_asts, &schema, build_mode, log_event).map_err( + |errors| BuildProjectError::ValidationErrors { + errors, + project_name: project_config.name, + }, + ) + })?; + + let chunks = if ir.len() < MIN_CHUNK_SIZE { + vec![ir] + } else { + let chunkify_time = log_event.start("chunkify_project_time"); + let dependency_map = get_ir_definition_references(&schema, &ir); + let definition_indexes: IndexSet = ir + .iter() + .map(|def| match def { + ExecutableDefinition::Operation(operation) => { + ExecutableDefinitionName::OperationDefinitionName(operation.name.item) + } + ExecutableDefinition::Fragment(fragment) => { + ExecutableDefinitionName::FragmentDefinitionName(fragment.name.item) + } + }) + .collect(); + + let mut unionfind = UnionFind::::new(definition_indexes.len()); + for (source, destinations) in &dependency_map { + let source_index = definition_indexes.get_index_of(source).unwrap(); + for destination in destinations { + let destination_index = definition_indexes.get_index_of(destination).unwrap(); + unionfind.union(source_index, destination_index); + } + } + + let mut groups = FxHashMap::default(); + for (idx, def) in ir.into_iter().enumerate() { + let group = unionfind.find(idx); + groups.entry(group).or_insert_with(Vec::new).push(def); + } + + let mut chunks = vec![]; + let mut buffer = Vec::new(); + for group in groups.into_values() { + if group.len() > MIN_CHUNK_SIZE { + chunks.push(group); + } else { + buffer.extend(group); + if buffer.len() > MIN_CHUNK_SIZE { + chunks.push(std::mem::take(&mut buffer)); + } + } + } + if !buffer.is_empty() { + chunks.push(buffer); + } + log_event.stop(chunkify_time); + chunks + }; + + // Turn the IR into base Programs. + let programs = log_event.time("build_program_time", || { + chunks + .into_iter() + .map(|definitions| Program::from_definitions(Arc::clone(&schema), definitions)) + .collect() + }); + Ok((programs, source_hashes)) +} + pub fn validate_program( config: &Config, project_config: &ProjectConfig, @@ -270,26 +356,43 @@ pub fn build_programs( } }, ); - let (program, source_hashes) = - build_raw_program(project_config, project_asts, schema, log_event, build_mode)?; + let (programs, source_hashes) = + build_raw_program_chunks(project_config, project_asts, schema, log_event, build_mode)?; if compiler_state.should_cancel_current_build() { debug!("Build is cancelled: updates in source code/or new file changes are pending."); return Err(BuildProjectFailure::Cancelled); } + let base_fragment_names = Arc::new(base_fragment_names); + let results: Vec<(Programs, Vec)> = programs + .into_par_iter() + .map(|program| { + // Call validation rules that go beyond type checking. + // FIXME: Return non-fatal diagnostics from transforms (only validations for now) + let diagnostics = validate_program(config, project_config, &program, log_event)?; + + let programs = transform_program( + project_config, + Arc::new(program), + Arc::clone(&base_fragment_names), + Arc::clone(&perf_logger), + log_event, + config.custom_transforms.as_ref(), + )?; - // Call validation rules that go beyond type checking. - // FIXME: Return non-fatal diagnostics from transforms (only validations for now) - let diagnostics = validate_program(config, project_config, &program, log_event)?; - - let programs = transform_program( - project_config, - Arc::new(program), - Arc::new(base_fragment_names), - Arc::clone(&perf_logger), - log_event, - config.custom_transforms.as_ref(), - )?; + Ok((programs, diagnostics)) + }) + .collect::, BuildProjectFailure>>()?; + + let len = results.len(); + let (programs, diagnostics) = results.into_iter().fold( + (Vec::with_capacity(len), vec![]), + |(mut programs, mut diagnostics), (temp_programs, temp_diagnostics)| { + programs.push(temp_programs); + diagnostics.extend(temp_diagnostics); + (programs, diagnostics) + }, + ); Ok(WithDiagnostics { item: (programs, Arc::new(source_hashes)), @@ -360,9 +463,19 @@ pub fn build_project( // Generate artifacts by collecting information from the `Programs`. let artifacts_timer = log_event.start("generate_artifacts_time"); - let artifacts = generate_artifacts(project_config, &programs, Arc::clone(&source_hashes)); + let artifacts = programs + .par_iter() + .map(|programs| generate_artifacts(project_config, programs, Arc::clone(&source_hashes))) + .flatten() + .collect(); log_event.stop(artifacts_timer); + let mut iter: std::vec::IntoIter = programs.into_iter(); + let mut programs = iter.next().expect("Expect at least one result"); + for temp_programs in iter { + merge_programs(&mut programs, temp_programs); + } + log_event.number( "generated_artifacts", programs.reader.document_count() + programs.normalization.document_count(), @@ -376,6 +489,26 @@ pub fn build_project( }) } +fn merge_programs(onto: &mut Programs, from: Programs) { + merge_program(Arc::get_mut(&mut onto.source).unwrap(), from.source); + merge_program(Arc::get_mut(&mut onto.reader).unwrap(), from.reader); + merge_program( + Arc::get_mut(&mut onto.normalization).unwrap(), + from.normalization, + ); + merge_program( + Arc::get_mut(&mut onto.operation_text).unwrap(), + from.operation_text, + ); + merge_program(Arc::get_mut(&mut onto.typegen).unwrap(), from.typegen); +} + +fn merge_program(onto: &mut Program, from: Arc) { + let from = Arc::unwrap_or_clone(from); + onto.fragments.extend(from.fragments); + onto.operations.extend(from.operations); +} + #[allow(clippy::too_many_arguments)] pub async fn commit_project( config: &Config,