Skip to content

Commit

Permalink
Add currently broken Apsp implementation
Browse files Browse the repository at this point in the history
Why it's broken is explained here: lf-lang/lingua-franca#1228 (comment)
  • Loading branch information
jhaye committed Aug 2, 2022
1 parent 9cbd28c commit b9c9605
Showing 1 changed file with 302 additions and 0 deletions.
302 changes: 302 additions & 0 deletions Rust/Savina/src/parallelism/Apsp.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
/**
* Copyright (C) 2020 TU Dresden
*
* This benchmark implements a parallel all pairs shortest path algorithm. In
* order to split the workload, the large input matrix of size graphSize x
* graphSize is split into smaller blocks of size blockSize x blockSize. Each of
* the worker reactors (ApspFloydWarshallBlock) processes one of these blocks.
* The worker reactors are organized in the same matrix pattern, replication the
* structure of the blocks within the large input matrix. Each of the workers
* operates on its local block data, and sends results to all other workers in
* the same column or in the same row. The data from the neighbors is then used
* to compute the next intermediate result and to update the local state
* accordingly.
*
* @author Christian Menard
* @author Hannes Klein
* @author Johannes Hayeß
*/

target Rust {
build-type : Release,
cargo-features: [ "cli" ],
rust-include: [ "../lib/matrix.rs", "../lib/pseudo_random.rs"],
};

import BenchmarkRunner from "../lib/BenchmarkRunner.lf";

reactor ApspFloydWarshallBlock(
bank_index: usize(0),
row_index: usize(0),
graphSize: usize(300),
blockSize: usize(50),
dimension: usize(6),
verbose: bool(false)
) {
state bank_index(bank_index);
state row_index(row_index);
state graph_size(graphSize);
state block_size(blockSize);
state dimension(dimension);
state verbose(verbose);

state num_neighbors: usize({=2 * (dimension - 1)=});
state row_offset: usize({=row_index * blockSize=}); // row offset of the block of this reactor
state col_offset: usize({=bank_index * blockSize=}); // column offset of the block of this reactor

state k: usize(0); // iteration counter
state reportedFinish: bool(false);

input start: Matrix<u64>;

input[dimension] fromRow: Arc<Matrix<u64>>;
input[dimension] fromCol: Arc<Matrix<u64>>;

output toNeighbors: Arc<Matrix<u64>>;
output finished: unit;

logical action notifyNeighbors: Arc<Matrix<u64>>;

preamble {=
use std::sync::Arc;
use crate::matrix::Matrix;

fn get_element_at(
row: usize,
col: usize,
row_ports: &ReadablePortBank<Arc<Matrix<u64>>>,
col_ports: &ReadablePortBank<Arc<Matrix<u64>>>,
ctx: &ReactionCtx,
block_size: usize,
row_index: usize,
bank_index: usize,
) -> u64 {
let dest_row = row / block_size;
let dest_col = col / block_size;
let local_row = row % block_size;
let local_col = col % block_size;

if dest_row == row_index {
ctx.use_ref_opt(&row_ports.get(dest_col), |r| {
*r.get(local_row, local_col)
}).unwrap()
} else if dest_col == bank_index {
ctx.use_ref_opt(&col_ports.get(dest_row), |c| {
*c.get(local_row, local_col)
}).unwrap()
} else {
eprintln!("Error: unexpected target location ({},{})", dest_col, dest_row);
std::process::exit(2);
}

}
=}

reaction(start) -> notifyNeighbors {=
// reset local state
self.k = 0;
self.reportedFinish = false;

// start execution
let matrix = ctx.use_ref_opt(start, Clone::clone).unwrap();
ctx.schedule_with_v(notifyNeighbors, Some(Arc::new(matrix)), Asap);
=}

reaction(notifyNeighbors) -> toNeighbors {=
//notify all neighbors
ctx.set(toNeighbors, ctx.use_ref_opt(notifyNeighbors, Clone::clone).unwrap());
=}

reaction(fromRow, fromCol) -> notifyNeighbors, finished {=
// do nothing if complete
if self.k == self.graph_size {
return;
}

// perform computation
let mut matrix: Matrix<u64> = Matrix::new(self.block_size, self.block_size);
let bs = self.block_size;
let ri = self.row_index;
let bi = self.bank_index;

for i in 0..self.block_size {
for j in 0..self.block_size {
let gi = self.row_offset + i;
let gj = self.col_offset + j;

let result = get_element_at(gi, self.k, &fromRow, &fromCol, &ctx, bs, ri, bi) + get_element_at(self.k, gj, &fromRow, &fromCol, &ctx, bs, ri, bi);
matrix.set(i, j, result.min(get_element_at(gi, gj, &fromRow, &fromCol, &ctx, bs, ri, bi)));
}
}

// increment iteration count
self.k += 1;

if self.k == self.graph_size {
if self.verbose && self.bank_index == 0 && self.row_index == 0 {
// debugging and result checking
for i in 0..self.block_size {
let mut result = "".to_string();
for j in 0..self.block_size {
result = format!("{} {}", result, matrix.get(i, j));
}
info!("{}", result);
}
}
ctx.set(finished, ());
}

// send the result to all neighbors in the next iteration
ctx.schedule_with_v(notifyNeighbors, Some(Arc::new(matrix)), Asap);
=}
}

reactor ApspRow(
bank_index: usize(0),
blockSize: usize(50),
numNodes: usize(300),
dimension: usize(6),
dimension_sq: usize(36),
verbose: bool(false)
) {

input start: Matrix<u64>;
output[dimension] finished: unit;

input[dimension_sq] fromCol: Matrix<u64>;
output[dimension] toCol: Matrix<u64>;

blocks = new[dimension] ApspFloydWarshallBlock(
row_index=bank_index,
blockSize=blockSize,
graphSize=numNodes,
dimension=dimension,
verbose=verbose
);

// connect all blocks within the row
(blocks.toNeighbors)+ -> blocks.fromRow;

// block output to all column neighbours
blocks.toNeighbors -> toCol;
// block input from all column neighbours
fromCol -> interleaved(blocks.fromCol);

// broadcast the incoming matrix to all blocks
(start)+ -> blocks.start;
// collect and forward finished signals from all blocks
blocks.finished -> finished;

preamble {=
use crate::matrix::Matrix;
=}
}

reactor ApspMatrix(
blockSize: usize(50),
numNodes: usize(300),
dimension: usize(6),
dimension_sq: usize(36),
verbose: bool(false)
) {
input start: Matrix<u64>;
output[dimension_sq] finished: unit;

rows = new[dimension] ApspRow(blockSize=blockSize, numNodes=numNodes, dimension=dimension, dimension_sq=dimension_sq, verbose=verbose);

// broadcast the incoming matrix to all rows
(start)+ -> rows.start;
// collect and forward finished signals from all blocks
rows.finished -> finished;

(rows.toCol)+ -> rows.fromCol;

preamble {=
use crate::matrix::Matrix;
=}
}

main reactor (
numIterations: usize(12),
maxEdgeWeight: usize(100),
blockSize: usize(50),
numNodes: usize(300),
verbose: bool(false)
) {
state num_iterations(numIterations);
state max_edge_weight(maxEdgeWeight);
state block_size(blockSize);
state num_nodes(numNodes);
state verbose(verbose);

state graph_data: Matrix<u64>;
state num_blocks_finished: usize(0);

runner = new BenchmarkRunner(num_iterations=numIterations);
matrix = new ApspMatrix(
blockSize=blockSize,
numNodes=numNodes,
dimension={=numNodes / blockSize=},
dimension_sq={=(numNodes / blockSize)*(numNodes / blockSize)=},
verbose=verbose
);

reaction(startup) {=
print_benchmark_info("ApspBenchmark");
print_args!(
"numIterations",
self.num_iterations,
"maxEdgeWeight",
self.max_edge_weight,
"numNodes",
self.num_nodes,
"blockSize",
self.block_size
);
print_system_info();

self.graph_data = generate_graph(self.num_nodes as i64, self.max_edge_weight as i64);
=}

reaction(runner.start) -> matrix.start {=
// reset local state
self.num_blocks_finished = 0;

// start execution
ctx.set(matrix__start, self.graph_data);
=}

reaction (matrix.finished) -> runner.finished {=
for f in matrix__finished {
if ctx.is_present(&f) {
self.num_blocks_finished += 1;
}
}
let dimension = self.num_nodes / self.block_size;
if self.num_blocks_finished == dimension * dimension {
ctx.set(runner__finished, ());
}
=}

preamble {=
use crate::matrix::Matrix;
use crate::{print_args,reactors::benchmark_runner::{print_system_info, print_benchmark_info}};
use crate::pseudo_random::PseudoRandomGenerator;

fn generate_graph(n: i64, w: i64) -> Matrix<u64> {
let random = PseudoRandomGenerator::from(n);
let nu = n as usize;
let mut local_data: Matrix<u64> = Matrix::new(nu, nu);

for i in 0..nu {
for j in (i+1)..nu {
let r = random.next_in_range(0..w).into() + 1;
local_data.set(i, j, r);
local_data.set(j, i, r);
}
}

local_data
}
=}
}

0 comments on commit b9c9605

Please sign in to comment.