From b9c96055355ebe23774edb0ecc656645bbffbe45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Haye=C3=9F?= Date: Tue, 2 Aug 2022 11:40:38 +0200 Subject: [PATCH] Add currently broken Apsp implementation Why it's broken is explained here: https://github.com/lf-lang/lingua-franca/pull/1228#issuecomment-1202254369 --- Rust/Savina/src/parallelism/Apsp.lf | 302 ++++++++++++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100644 Rust/Savina/src/parallelism/Apsp.lf diff --git a/Rust/Savina/src/parallelism/Apsp.lf b/Rust/Savina/src/parallelism/Apsp.lf new file mode 100644 index 0000000..0f974af --- /dev/null +++ b/Rust/Savina/src/parallelism/Apsp.lf @@ -0,0 +1,302 @@ +/** + * Copyright (C) 2020 TU Dresden + * + * This benchmark implements a parallel all pairs shortest path algorithm. In + * order to split the workload, the large input matrix of size graphSize x + * graphSize is split into smaller blocks of size blockSize x blockSize. Each of + * the worker reactors (ApspFloydWarshallBlock) processes one of these blocks. + * The worker reactors are organized in the same matrix pattern, replication the + * structure of the blocks within the large input matrix. Each of the workers + * operates on its local block data, and sends results to all other workers in + * the same column or in the same row. The data from the neighbors is then used + * to compute the next intermediate result and to update the local state + * accordingly. + * + * @author Christian Menard + * @author Hannes Klein + * @author Johannes Hayeß + */ + +target Rust { + build-type : Release, + cargo-features: [ "cli" ], + rust-include: [ "../lib/matrix.rs", "../lib/pseudo_random.rs"], +}; + +import BenchmarkRunner from "../lib/BenchmarkRunner.lf"; + +reactor ApspFloydWarshallBlock( + bank_index: usize(0), + row_index: usize(0), + graphSize: usize(300), + blockSize: usize(50), + dimension: usize(6), + verbose: bool(false) +) { + state bank_index(bank_index); + state row_index(row_index); + state graph_size(graphSize); + state block_size(blockSize); + state dimension(dimension); + state verbose(verbose); + + state num_neighbors: usize({=2 * (dimension - 1)=}); + state row_offset: usize({=row_index * blockSize=}); // row offset of the block of this reactor + state col_offset: usize({=bank_index * blockSize=}); // column offset of the block of this reactor + + state k: usize(0); // iteration counter + state reportedFinish: bool(false); + + input start: Matrix; + + input[dimension] fromRow: Arc>; + input[dimension] fromCol: Arc>; + + output toNeighbors: Arc>; + output finished: unit; + + logical action notifyNeighbors: Arc>; + + preamble {= + use std::sync::Arc; + use crate::matrix::Matrix; + + fn get_element_at( + row: usize, + col: usize, + row_ports: &ReadablePortBank>>, + col_ports: &ReadablePortBank>>, + ctx: &ReactionCtx, + block_size: usize, + row_index: usize, + bank_index: usize, + ) -> u64 { + let dest_row = row / block_size; + let dest_col = col / block_size; + let local_row = row % block_size; + let local_col = col % block_size; + + if dest_row == row_index { + ctx.use_ref_opt(&row_ports.get(dest_col), |r| { + *r.get(local_row, local_col) + }).unwrap() + } else if dest_col == bank_index { + ctx.use_ref_opt(&col_ports.get(dest_row), |c| { + *c.get(local_row, local_col) + }).unwrap() + } else { + eprintln!("Error: unexpected target location ({},{})", dest_col, dest_row); + std::process::exit(2); + } + + } + =} + + reaction(start) -> notifyNeighbors {= + // reset local state + self.k = 0; + self.reportedFinish = false; + + // start execution + let matrix = ctx.use_ref_opt(start, Clone::clone).unwrap(); + ctx.schedule_with_v(notifyNeighbors, Some(Arc::new(matrix)), Asap); + =} + + reaction(notifyNeighbors) -> toNeighbors {= + //notify all neighbors + ctx.set(toNeighbors, ctx.use_ref_opt(notifyNeighbors, Clone::clone).unwrap()); + =} + + reaction(fromRow, fromCol) -> notifyNeighbors, finished {= + // do nothing if complete + if self.k == self.graph_size { + return; + } + + // perform computation + let mut matrix: Matrix = Matrix::new(self.block_size, self.block_size); + let bs = self.block_size; + let ri = self.row_index; + let bi = self.bank_index; + + for i in 0..self.block_size { + for j in 0..self.block_size { + let gi = self.row_offset + i; + let gj = self.col_offset + j; + + let result = get_element_at(gi, self.k, &fromRow, &fromCol, &ctx, bs, ri, bi) + get_element_at(self.k, gj, &fromRow, &fromCol, &ctx, bs, ri, bi); + matrix.set(i, j, result.min(get_element_at(gi, gj, &fromRow, &fromCol, &ctx, bs, ri, bi))); + } + } + + // increment iteration count + self.k += 1; + + if self.k == self.graph_size { + if self.verbose && self.bank_index == 0 && self.row_index == 0 { + // debugging and result checking + for i in 0..self.block_size { + let mut result = "".to_string(); + for j in 0..self.block_size { + result = format!("{} {}", result, matrix.get(i, j)); + } + info!("{}", result); + } + } + ctx.set(finished, ()); + } + + // send the result to all neighbors in the next iteration + ctx.schedule_with_v(notifyNeighbors, Some(Arc::new(matrix)), Asap); + =} +} + +reactor ApspRow( + bank_index: usize(0), + blockSize: usize(50), + numNodes: usize(300), + dimension: usize(6), + dimension_sq: usize(36), + verbose: bool(false) +) { + + input start: Matrix; + output[dimension] finished: unit; + + input[dimension_sq] fromCol: Matrix; + output[dimension] toCol: Matrix; + + blocks = new[dimension] ApspFloydWarshallBlock( + row_index=bank_index, + blockSize=blockSize, + graphSize=numNodes, + dimension=dimension, + verbose=verbose + ); + + // connect all blocks within the row + (blocks.toNeighbors)+ -> blocks.fromRow; + + // block output to all column neighbours + blocks.toNeighbors -> toCol; + // block input from all column neighbours + fromCol -> interleaved(blocks.fromCol); + + // broadcast the incoming matrix to all blocks + (start)+ -> blocks.start; + // collect and forward finished signals from all blocks + blocks.finished -> finished; + + preamble {= + use crate::matrix::Matrix; + =} +} + +reactor ApspMatrix( + blockSize: usize(50), + numNodes: usize(300), + dimension: usize(6), + dimension_sq: usize(36), + verbose: bool(false) +) { + input start: Matrix; + output[dimension_sq] finished: unit; + + rows = new[dimension] ApspRow(blockSize=blockSize, numNodes=numNodes, dimension=dimension, dimension_sq=dimension_sq, verbose=verbose); + + // broadcast the incoming matrix to all rows + (start)+ -> rows.start; + // collect and forward finished signals from all blocks + rows.finished -> finished; + + (rows.toCol)+ -> rows.fromCol; + + preamble {= + use crate::matrix::Matrix; + =} +} + +main reactor ( + numIterations: usize(12), + maxEdgeWeight: usize(100), + blockSize: usize(50), + numNodes: usize(300), + verbose: bool(false) +) { + state num_iterations(numIterations); + state max_edge_weight(maxEdgeWeight); + state block_size(blockSize); + state num_nodes(numNodes); + state verbose(verbose); + + state graph_data: Matrix; + state num_blocks_finished: usize(0); + + runner = new BenchmarkRunner(num_iterations=numIterations); + matrix = new ApspMatrix( + blockSize=blockSize, + numNodes=numNodes, + dimension={=numNodes / blockSize=}, + dimension_sq={=(numNodes / blockSize)*(numNodes / blockSize)=}, + verbose=verbose + ); + + reaction(startup) {= + print_benchmark_info("ApspBenchmark"); + print_args!( + "numIterations", + self.num_iterations, + "maxEdgeWeight", + self.max_edge_weight, + "numNodes", + self.num_nodes, + "blockSize", + self.block_size + ); + print_system_info(); + + self.graph_data = generate_graph(self.num_nodes as i64, self.max_edge_weight as i64); + =} + + reaction(runner.start) -> matrix.start {= + // reset local state + self.num_blocks_finished = 0; + + // start execution + ctx.set(matrix__start, self.graph_data); + =} + + reaction (matrix.finished) -> runner.finished {= + for f in matrix__finished { + if ctx.is_present(&f) { + self.num_blocks_finished += 1; + } + } + let dimension = self.num_nodes / self.block_size; + if self.num_blocks_finished == dimension * dimension { + ctx.set(runner__finished, ()); + } + =} + + preamble {= + use crate::matrix::Matrix; + use crate::{print_args,reactors::benchmark_runner::{print_system_info, print_benchmark_info}}; + use crate::pseudo_random::PseudoRandomGenerator; + + fn generate_graph(n: i64, w: i64) -> Matrix { + let random = PseudoRandomGenerator::from(n); + let nu = n as usize; + let mut local_data: Matrix = Matrix::new(nu, nu); + + for i in 0..nu { + for j in (i+1)..nu { + let r = random.next_in_range(0..w).into() + 1; + local_data.set(i, j, r); + local_data.set(j, i, r); + } + } + + local_data + } + =} +} \ No newline at end of file