From 60a80e6ea2ab64bc69f7d7b3150531173b11f973 Mon Sep 17 00:00:00 2001 From: Andreas Hoenselaar Date: Tue, 13 Jul 2021 11:12:37 -0700 Subject: [PATCH] Streamline communication (#1656) --- python/meep.i | 8 +++ scheme/Makefile.am | 2 +- scheme/meep.i | 9 ++- src/boundaries.cpp | 129 ++++++++++++++++++++++++++++++++------- src/fields.cpp | 16 ++--- src/meep.hpp | 124 ++++++++++++++++++++++++++++++++------ src/meep/vec.hpp | 4 +- src/mympi.cpp | 103 +++++++++++++++++--------------- src/step.cpp | 146 +++++++++++++++++++++++++++++++-------------- 9 files changed, 391 insertions(+), 150 deletions(-) diff --git a/python/meep.i b/python/meep.i index de0166b2a..b802e963a 100644 --- a/python/meep.i +++ b/python/meep.i @@ -1469,7 +1469,15 @@ void _get_gradient(PyObject *grad, PyObject *fields_a, PyObject *fields_f, PyObj %ignore is_medium; %ignore is_medium; %ignore is_metal; +%ignore meep::all_in_or_out; +%ignore meep::all_connect_phases; %ignore meep::choose_chunkdivision; +%ignore meep::comms_key; +%ignore meep::comms_key_hash_fn; +%ignore meep::comms_manager; +%ignore meep::comms_operation; +%ignore meep::comms_sequence; +%ignore meep::create_comms_manager; %ignore meep::fields_chunk; %ignore meep::infinity; diff --git a/scheme/Makefile.am b/scheme/Makefile.am index 9b8329b54..d26b5bae3 100644 --- a/scheme/Makefile.am +++ b/scheme/Makefile.am @@ -39,7 +39,7 @@ endif # workaround missing namespace prefix in swig meep_renames.i: $(LIBHDRS) - (echo "// AUTOMATICALLY GENERATED -- DO NOT EDIT"; perl -pe 's/^ *class +([A-Za-z_0-9:]*)( *| *:[^{]*){.*$$/%rename(meep_\1) meep::\1;/' $(LIBHDRS) | grep "%rename" | sed '/meep_fields_chunk/d' | sort -u; echo; grep -hv typedef $(LIBHDRS) | perl -pe 's/(inline|const|extern|static) +//g' | perl -pe 's/^[A-Za-z_0-9:<>]+[* ]+([A-Za-z_0-9:]*) *\(.*$$/%rename(meep_\1) meep::\1;/' | grep "%rename" | sed '/choose_chunkdivision/d' | sort -u; ) > $@ + (echo "// AUTOMATICALLY GENERATED -- DO NOT EDIT"; perl -pe 's/^ *class +([A-Za-z_0-9:]*)( *| *:[^{]*){.*$$/%rename(meep_\1) meep::\1;/' $(LIBHDRS) | grep "%rename" | sed '/meep_fields_chunk/d ; /meep_comms_manager/d ; /meep_comms_key_hash_fn/d' | sort -u; echo; grep -hv typedef $(LIBHDRS) | perl -pe 's/(inline|const|extern|static) +//g' | perl -pe 's/^[A-Za-z_0-9:<>]+[* ]+([A-Za-z_0-9:]*) *\(.*$$/%rename(meep_\1) meep::\1;/' | grep "%rename" | sed '/choose_chunkdivision/d ; /meep_create_comms_manager/d' | sort -u; ) > $@ # work around bug in swig, where it doesn't prepend namespace to friend funcs meep_swig_bug_workaround.i: $(LIBHDRS) diff --git a/scheme/meep.i b/scheme/meep.i index a03fde7f6..b88885e5d 100644 --- a/scheme/meep.i +++ b/scheme/meep.i @@ -280,7 +280,15 @@ static meep::vec my_kpoint_func(double freq, int mode, void *user_data) { %warnfilter(302,325,451,503,509); +%ignore meep::all_in_or_out; +%ignore meep::all_connect_phases; %ignore meep::choose_chunkdivision; +%ignore meep::comms_key; +%ignore meep::comms_key_hash_fn; +%ignore meep::comms_manager; +%ignore meep::comms_operation; +%ignore meep::comms_sequence; +%ignore meep::create_comms_manager; %ignore meep::fields_chunk; %ignore meep_geom::fragment_stats; @@ -300,4 +308,3 @@ static meep::vec my_kpoint_func(double freq, int mode, void *user_data) { %} %include "meep-ctl-swig.hpp" - diff --git a/src/boundaries.cpp b/src/boundaries.cpp index 335c9267c..e0dc87287 100644 --- a/src/boundaries.cpp +++ b/src/boundaries.cpp @@ -15,10 +15,13 @@ % Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ +#include +#include #include #include #include "meep.hpp" +#include "meep/mympi.hpp" #include "meep_internals.hpp" #define UNUSED(x) (void)x // silence compiler warnings @@ -27,6 +30,52 @@ using namespace std; namespace meep { +namespace { + +// Creates an optimized comms_sequence from a vector of comms_operations. +// Send operations are prioritized in descending order by the amount of data that is transferred. +comms_sequence optimize_comms_operations(const std::vector &operations) { + comms_sequence ret; + std::map send_size_by_my_chunk_idx; + std::map > send_ops_by_my_chunk_idx; + + for (const auto &op : operations) { + if (op.comm_direction == Incoming) { + ret.receive_ops.push_back(op); + continue; + } + + // Group send operations by source chunk and accumulate the transfer size - excluding chunk + // pairs that reside on the same processor. + if (op.other_proc_id != my_rank()) { + send_size_by_my_chunk_idx[op.my_chunk_idx] += op.transfer_size; + } + else { + // Make sure that op.my_chunk_idx is represented in the map. + send_size_by_my_chunk_idx[op.my_chunk_idx] += 0; + } + send_ops_by_my_chunk_idx[op.my_chunk_idx].push_back(op); + } + + // Sort in descending order to prioritize large transfers. + std::vector > send_op_sizes(send_size_by_my_chunk_idx.begin(), + send_size_by_my_chunk_idx.end()); + std::sort(send_op_sizes.begin(), send_op_sizes.end(), + [](const std::pair &a, const std::pair &b) -> bool { + return a.second > b.second; + }); + + // Assemble send operations. + for (const auto &size_pair : send_op_sizes) { + int my_chunk_idx = size_pair.first; + const auto& ops_vector = send_ops_by_my_chunk_idx[my_chunk_idx]; + ret.send_ops.insert(std::end(ret.send_ops), std::begin(ops_vector), std::end(ops_vector)); + } + return ret; +} + +} // namespace + void fields::set_boundary(boundary_side b, direction d, boundary_condition cond) { if (boundaries[b][d] != cond) { boundaries[b][d] = cond; @@ -116,10 +165,10 @@ void fields::disconnect_chunks() { for (int i = 0; i < num_chunks * num_chunks; i++) { delete[] comm_blocks[ft][i]; comm_blocks[ft][i] = 0; - for (int ip = 0; ip < 3; ++ip) - comm_sizes[ft][ip][i] = 0; } + comms_sequence_for_field[ft].clear(); } + comm_sizes.clear(); } // this should be called by any code that might set chunk_connections_valid = false, @@ -309,9 +358,9 @@ bool fields_chunk::needs_W_notowned(component c) { } void fields::connect_the_chunks() { - size_t *nc[NUM_FIELD_TYPES][3][2]; + size_t *nc[NUM_FIELD_TYPES][NUM_CONNECT_PHASE_TYPES][2]; FOR_FIELD_TYPES(f) { - for (int ip = 0; ip < 3; ip++) + for (connect_phase ip : all_connect_phases) for (int io = 0; io < 2; io++) { nc[f][ip][io] = new size_t[num_chunks]; for (int i = 0; i < num_chunks; i++) @@ -356,14 +405,10 @@ void fields::connect_the_chunks() { FOR_E_AND_H(c) { needs_W_notowned[c] = or_to_all(needs_W_notowned[c]); } finished_working(); + comm_sizes.clear(); for (int i = 0; i < num_chunks; i++) { // First count the border elements... const grid_volume vi = chunks[i]->gv; - FOR_FIELD_TYPES(ft) { - for (int ip = 0; ip < 3; ip++) - for (int j = 0; j < num_chunks; j++) - comm_sizes[ft][ip][j + i * num_chunks] = 0; - } FOR_COMPONENTS(corig) { if (have_component(corig)) LOOP_OVER_VOL_NOTOWNED(vi, corig, n) { IVEC_LOOP_ILOC(vi, here); @@ -372,10 +417,10 @@ void fields::connect_the_chunks() { complex thephase; if (locate_component_point(&c, &here, &thephase) && !on_metal_boundary(here)) for (int j = 0; j < num_chunks; j++) { + const std::pair pair_j_to_i{j, i}; if ((chunks[i]->is_mine() || chunks[j]->is_mine()) && chunks[j]->gv.owns(here) && !(is_B(corig) && is_B(c) && B_redundant[5 * i + corig - Bx] && B_redundant[5 * j + c - Bx])) { - const int pair = j + i * num_chunks; const connect_phase ip = thephase == 1.0 ? CONNECT_COPY : (thephase == -1.0 ? CONNECT_NEGATE : CONNECT_PHASE); @@ -384,14 +429,14 @@ void fields::connect_the_chunks() { const int nn = is_real ? 1 : 2; nc[f][ip][Incoming][i] += nn; nc[f][ip][Outgoing][j] += nn; - comm_sizes[f][ip][pair] += nn; + comm_sizes[{f, ip, pair_j_to_i}] += nn; } if (needs_W_notowned[corig]) { field_type f = is_electric(corig) ? WE_stuff : WH_stuff; const int nn = is_real ? 1 : 2; nc[f][ip][Incoming][i] += nn; nc[f][ip][Outgoing][j] += nn; - comm_sizes[f][ip][pair] += nn; + comm_sizes[{f, ip, pair_j_to_i}] += nn; } if (is_electric(corig) || is_magnetic(corig)) { field_type f = is_electric(corig) ? PE_stuff : PH_stuff; @@ -411,11 +456,11 @@ void fields::connect_the_chunks() { const size_t nn = (is_real ? 1 : 2) * (cni); nc[f][ip][Incoming][i] += nn; nc[f][ip][Outgoing][j] += nn; - comm_sizes[f][ip][pair] += nn; + comm_sizes[{f, ip, pair_j_to_i}] += nn; const connect_phase iip = CONNECT_COPY; nc[f][iip][Incoming][i] += ni; nc[f][iip][Outgoing][j] += ni; - comm_sizes[f][iip][pair] += ni; + comm_sizes[{f, iip, pair_j_to_i}] += ni; } } // if is_mine and owns... } // loop over j chunks @@ -426,7 +471,7 @@ void fields::connect_the_chunks() { FOR_FIELD_TYPES(ft) { for (int j = 0; j < num_chunks; j++) { delete[] comm_blocks[ft][j + i * num_chunks]; - comm_blocks[ft][j + i * num_chunks] = new realnum[comm_size_tot(ft, j + i * num_chunks)]; + comm_blocks[ft][j + i * num_chunks] = new realnum[comm_size_tot(ft, {j, i})]; } } } // loop over i chunks @@ -438,15 +483,15 @@ void fields::connect_the_chunks() { for i < i' */ // wh stores the current indices in the connections array(s) - size_t *wh[NUM_FIELD_TYPES][3][2]; + size_t *wh[NUM_FIELD_TYPES][NUM_CONNECT_PHASE_TYPES][2]; /* Now allocate the connection arrays... this is still slightly wasteful (by a factor of 2) because we allocate for chunks we don't own if we have a connection with them. Removing this waste would mean a bunch more is_mine() checks below. */ FOR_FIELD_TYPES(f) { - for (int ip = 0; ip < 3; ip++) { - for (int io = 0; io < 2; io++) { + for (connect_phase ip : all_connect_phases) { + for (in_or_out io : all_in_or_out) { for (int i = 0; i < num_chunks; i++) chunks[i]->alloc_extra_connections(field_type(f), connect_phase(ip), in_or_out(io), nc[f][ip][io][i]); @@ -465,11 +510,10 @@ void fields::connect_the_chunks() { // initialize wh[f][ip][Incoming][j] to sum of comm_sizes for jj < j FOR_FIELD_TYPES(f) { - for (int ip = 0; ip < 3; ip++) { + for (connect_phase ip : all_connect_phases) { wh[f][ip][Incoming][0] = 0; for (int j = 1; j < num_chunks; ++j) - wh[f][ip][Incoming][j] = - wh[f][ip][Incoming][j - 1] + comm_sizes[f][ip][(j - 1) + i * num_chunks]; + wh[f][ip][Incoming][j] = wh[f][ip][Incoming][j - 1] + get_comm_size({f, ip, {j - 1, i}}); } } @@ -555,10 +599,51 @@ void fields::connect_the_chunks() { } // LOOP_OVER_VOL_NOTOWNED } // FOR_COMPONENTS } // loop over i chunks + FOR_FIELD_TYPES(f) { - for (int ip = 0; ip < 3; ip++) + for (connect_phase ip : all_connect_phases) for (int io = 0; io < 2; io++) delete[] wh[f][ip][io]; + + // Calculate the sequence of sends and receives in advance. + // Initiate receive operations as early as possible. + std::unique_ptr manager = create_comms_manager(); + std::vector operations; + std::vector tagto(count_processors()); + + for (int j = 0; j < num_chunks; j++) { + for (int i = 0; i < num_chunks; i++) { + const chunk_pair pair{j, i}; + const size_t comm_size = comm_size_tot(f, pair); + if (!comm_size) continue; + if (comm_size > manager->max_transfer_size()) { + // MPI uses int for size to send/recv + meep::abort("communications size too big for the current implementation"); + } + const int pair_idx = j + i * num_chunks; + + if (chunks[j]->is_mine()) { + operations.push_back(comms_operation{/*my_chunk_idx=*/j, + /*other_chunk_idx=*/i, + /*other_proc_id=*/chunks[i]->n_proc(), + /*pair_idx=*/pair_idx, + /*transfer_size=*/comm_size, + /*comm_direction=*/Outgoing, + /*tag=*/tagto[chunks[i]->n_proc()]++}); + } + if (chunks[i]->is_mine()) { + operations.push_back(comms_operation{/*my_chunk_idx=*/i, + /*other_chunk_idx=*/j, + /*other_proc_id=*/chunks[j]->n_proc(), + /*pair_idx=*/pair_idx, + /*transfer_size=*/comm_size, + /*comm_direction=*/Incoming, + /*tag=*/tagto[chunks[j]->n_proc()]++}); + } + } + } + + comms_sequence_for_field[f] = optimize_comms_operations(operations); } delete[] B_redundant; } diff --git a/src/fields.cpp b/src/fields.cpp index d48c2c9d0..c78b7d4f9 100644 --- a/src/fields.cpp +++ b/src/fields.cpp @@ -58,11 +58,6 @@ fields::fields(structure *s, double m, double beta, bool zero_fields_near_cylori for (int i = 0; i < num_chunks; i++) chunks[i] = new fields_chunk(s->chunks[i], outdir, m, beta, zero_fields_near_cylorigin, i); FOR_FIELD_TYPES(ft) { - for (int ip = 0; ip < 3; ip++) { - comm_sizes[ft][ip] = new size_t[num_chunks * num_chunks]; - for (int i = 0; i < num_chunks * num_chunks; i++) - comm_sizes[ft][ip][i] = 0; - } typedef realnum *realnum_ptr; comm_blocks[ft] = new realnum_ptr[num_chunks * num_chunks]; for (int i = 0; i < num_chunks * num_chunks; i++) @@ -116,11 +111,6 @@ fields::fields(const fields &thef) for (int i = 0; i < num_chunks; i++) chunks[i] = new fields_chunk(*thef.chunks[i], i); FOR_FIELD_TYPES(ft) { - for (int ip = 0; ip < 3; ip++) { - comm_sizes[ft][ip] = new size_t[num_chunks * num_chunks]; - for (int i = 0; i < num_chunks * num_chunks; i++) - comm_sizes[ft][ip][i] = 0; - } typedef realnum *realnum_ptr; comm_blocks[ft] = new realnum_ptr[num_chunks * num_chunks]; for (int i = 0; i < num_chunks * num_chunks; i++) @@ -140,8 +130,6 @@ fields::~fields() { for (int i = 0; i < num_chunks * num_chunks; i++) delete[] comm_blocks[ft][i]; delete[] comm_blocks[ft]; - for (int ip = 0; ip < 3; ip++) - delete[] comm_sizes[ft][ip]; } delete sources; delete fluxes; @@ -772,4 +760,8 @@ double linear_interpolate(double rx, double ry, double rz, double *data, #undef D } +bool operator==(const comms_key &lhs, const comms_key &rhs) { + return (lhs.ft == rhs.ft) && (lhs.phase == rhs.phase) && (lhs.pair == rhs.pair); +} + } // namespace meep diff --git a/src/meep.hpp b/src/meep.hpp index f59815583..a01b42f09 100644 --- a/src/meep.hpp +++ b/src/meep.hpp @@ -17,7 +17,10 @@ #ifndef MEEP_H #define MEEP_H +#include +#include #include +#include #include #include #include @@ -711,6 +714,81 @@ boundary_region pml(double thickness, double Rasymptotic = 1e-15, double mean_st class binary_partition; +enum in_or_out { Incoming = 0, Outgoing }; +constexpr std::initializer_list all_in_or_out{Incoming, Outgoing}; + +enum connect_phase { CONNECT_PHASE = 0, CONNECT_NEGATE = 1, CONNECT_COPY = 2 }; +constexpr std::initializer_list all_connect_phases{ + CONNECT_PHASE, CONNECT_NEGATE, CONNECT_COPY}; +constexpr int NUM_CONNECT_PHASE_TYPES = 3; + +// Communication pair(i, j) implies data being sent from i to j. +using chunk_pair = std::pair; + +// Key for the map that stored communication sizes. +struct comms_key { + field_type ft; + connect_phase phase; + chunk_pair pair; +}; + +// Defined in fields.cpp +bool operator==(const comms_key &lhs, const comms_key &rhs); + +class comms_key_hash_fn { + +public: + inline std::size_t operator()(const comms_key &key) const { + // Unroll hash combination to promote the generatiion of efficient code. + std::size_t ret = ft_hasher(key.ft); + ret ^= connect_phase_hasher(key.phase) + kHashAddConst + (ret << 6) + (ret >> 2); + ret ^= int_hasher(key.pair.first) + kHashAddConst + (ret << 6) + (ret >> 2); + ret ^= int_hasher(key.pair.second) + kHashAddConst + (ret << 6) + (ret >> 2); + return ret; + } + +private: + static constexpr size_t kHashAddConst = 0x9e3779b9; + std::hash int_hasher; + std::hash connect_phase_hasher; + std::hash ft_hasher; +}; + +// Represents a communication operation between chunks. +struct comms_operation { + ptrdiff_t my_chunk_idx; + ptrdiff_t other_chunk_idx; + int other_proc_id; + int pair_idx; // The numeric pair index used in many communications related arrays. + size_t transfer_size; + in_or_out comm_direction; + int tag; +}; + +struct comms_sequence { + std::vector receive_ops; + std::vector send_ops; + + void clear() { + receive_ops.clear(); + send_ops.clear(); + } +}; + +// RAII based comms_manager that allows asynchronous send and receive functions to be initiated. +// Upon destruction, the comms_manager waits for completion of all enqueued operations. +class comms_manager { + public: + virtual ~comms_manager() {} + virtual void send_real_async(const void *buf, size_t count, int dest, int tag) = 0; + virtual void receive_real_async(void *buf, size_t count, int source, int tag) = 0; + virtual size_t max_transfer_size() const { return std::numeric_limits::max(); }; +}; + +// Factory function for `comms_manager`. +std::unique_ptr create_comms_manager(); + + class structure { public: structure_chunk **chunks; @@ -1320,8 +1398,6 @@ class dft_fields { volume where; }; -enum in_or_out { Incoming = 0, Outgoing }; -enum connect_phase { CONNECT_PHASE = 0, CONNECT_NEGATE = 1, CONNECT_COPY = 2 }; // data for each susceptibility typedef struct polarization_state_s { @@ -1359,8 +1435,8 @@ class fields_chunk { realnum **zeroes[NUM_FIELD_TYPES]; // Holds pointers to metal points. size_t num_zeroes[NUM_FIELD_TYPES]; - realnum **connections[NUM_FIELD_TYPES][CONNECT_COPY + 1][Outgoing + 1]; - size_t num_connections[NUM_FIELD_TYPES][CONNECT_COPY + 1][Outgoing + 1]; + realnum **connections[NUM_FIELD_TYPES][NUM_CONNECT_PHASE_TYPES][Outgoing + 1]; + size_t num_connections[NUM_FIELD_TYPES][NUM_CONNECT_PHASE_TYPES][Outgoing + 1]; std::complex *connection_phases[NUM_FIELD_TYPES]; int npol[NUM_FIELD_TYPES]; // only E_stuff and H_stuff are used @@ -1570,19 +1646,6 @@ class fields { flux_vol *fluxes; symmetry S; - // The following is an array that is num_chunks by num_chunks. Actually - // it is two arrays, one for the imaginary and one for the real part. - realnum **comm_blocks[NUM_FIELD_TYPES]; - // This is the same size as each comm_blocks array, and store the sizes - // of the comm blocks themselves for each connection-phase type - size_t *comm_sizes[NUM_FIELD_TYPES][CONNECT_COPY + 1]; - size_t comm_size_tot(int f, int pair) const { - size_t sum = 0; - for (int ip = 0; ip < 3; ++ip) - sum += comm_sizes[f][ip][pair]; - return sum; - } - double a, dt; // The resolution a and timestep dt=Courant/a grid_volume gv, user_volume; volume v; @@ -2057,8 +2120,6 @@ class fields { bool locate_point_in_user_volume(ivec *, std::complex *phase) const; void locate_volume_source_in_user_volume(const vec p1, const vec p2, vec newp1[8], vec newp2[8], std::complex kphase[8], int &ncopies) const; - // mympi.cpp - void boundary_communications(field_type); // step.cpp void phase_material(); void step_db(field_type ft); @@ -2081,6 +2142,31 @@ class fields { void am_now_working_on(time_sink); void finished_working(); void reset_timers(); + + private: + // The following is an array that is num_chunks by num_chunks. Actually + // it is two arrays, one for the imaginary and one for the real part. + realnum **comm_blocks[NUM_FIELD_TYPES]; + // Map with all non-zero communication block sizes. + std::unordered_map comm_sizes; + // The sequence of send and receive operations for each field type. + comms_sequence comms_sequence_for_field[NUM_FIELD_TYPES]; + + size_t get_comm_size(const comms_key& key) const { + auto it = comm_sizes.find(key); + return (it != comm_sizes.end()) ? it->second : 0; + } + + size_t comm_size_tot(field_type ft, const chunk_pair& pair) const { + size_t sum = 0; + for (auto ip : all_connect_phases) + sum += get_comm_size({ft, ip, pair}); + return sum; + } + + int chunk_pair_to_index(const chunk_pair& pair) const { + return pair.first + num_chunks * pair.second; + } }; class flux_vol { diff --git a/src/meep/vec.hpp b/src/meep/vec.hpp index 1eca2e9e9..6f1c5ae04 100644 --- a/src/meep/vec.hpp +++ b/src/meep/vec.hpp @@ -25,8 +25,8 @@ namespace meep { -const int NUM_FIELD_COMPONENTS = 20; -const int NUM_FIELD_TYPES = 8; +constexpr int NUM_FIELD_COMPONENTS = 20; +constexpr int NUM_FIELD_TYPES = 8; enum component { Ex = 0, diff --git a/src/mympi.cpp b/src/mympi.cpp index 814bd10bc..b1c351831 100644 --- a/src/mympi.cpp +++ b/src/mympi.cpp @@ -67,10 +67,64 @@ using namespace std; namespace meep { +namespace { + #ifdef HAVE_MPI -static MPI_Comm mycomm = MPI_COMM_WORLD; +MPI_Comm mycomm = MPI_COMM_WORLD; #endif +// comms_manager implementation that uses MPI. +class mpi_comms_manager : public comms_manager { +public: + mpi_comms_manager() {} + ~mpi_comms_manager() override { +#ifdef HAVE_MPI + if (!reqs.empty()) { + MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE); + } +#endif + } + + void send_real_async(const void *buf, size_t count, int dest, int tag) override { +#ifdef HAVE_MPI + reqs.emplace_back(); + MPI_Isend(buf, static_cast(count), MPI_REALNUM, dest, tag, mycomm, &reqs.back()); +#else + (void)buf; + (void)count; + (void)dest; + (void)tag; +#endif + } + + void receive_real_async(void *buf, size_t count, int source, int tag) override { +#ifdef HAVE_MPI + reqs.emplace_back(); + MPI_Irecv(buf, static_cast(count), MPI_REALNUM, source, tag, mycomm, &reqs.back()); +#else + (void)buf; + (void)count; + (void)source; + (void)tag; +#endif + } + +#ifdef HAVE_MPI + size_t max_transfer_size() const override { return std::numeric_limits::max(); } +#endif + +private: +#ifdef HAVE_MPI + std::vector reqs; +#endif +}; + +} // namespace + +std::unique_ptr create_comms_manager() { + return std::unique_ptr(new mpi_comms_manager()); +} + int verbosity = 1; // defined in meep.h initialize::initialize(int &argc, char **&argv) { @@ -517,53 +571,6 @@ bool with_mpi() { #endif } -void fields::boundary_communications(field_type ft) { - // Communicate the data around! -#if 0 // This is the blocking version, which should always be safe! - for (int noti=0;notin_proc(), chunks[i]->n_proc(), - comm_blocks[ft][pair], comm_size_tot(ft,pair)); - } - } -#endif -#ifdef HAVE_MPI - const int maxreq = num_chunks * num_chunks; - MPI_Request *reqs = new MPI_Request[maxreq]; - MPI_Status *stats = new MPI_Status[maxreq]; - int reqnum = 0; - int *tagto = new int[count_processors()]; - for (int i = 0; i < count_processors(); i++) - tagto[i] = 0; - for (int noti = 0; noti < num_chunks; noti++) - for (int j = 0; j < num_chunks; j++) { - const int i = (noti + j) % num_chunks; - const int pair = j + i * num_chunks; - const size_t comm_size = comm_size_tot(ft, pair); - if (comm_size > 0) { - if (comm_size > 2147483647) // MPI uses int for size to send/recv - meep::abort("communications size too big for MPI"); - if (chunks[j]->is_mine() && !chunks[i]->is_mine()) - MPI_Isend(comm_blocks[ft][pair], (int)comm_size, MPI_REALNUM, chunks[i]->n_proc(), - tagto[chunks[i]->n_proc()]++, mycomm, &reqs[reqnum++]); - if (chunks[i]->is_mine() && !chunks[j]->is_mine()) - MPI_Irecv(comm_blocks[ft][pair], (int)comm_size, MPI_REALNUM, chunks[j]->n_proc(), - tagto[chunks[j]->n_proc()]++, mycomm, &reqs[reqnum++]); - } - } - delete[] tagto; - if (reqnum > maxreq) meep::abort("Too many requests!!!\n"); - if (reqnum > 0) MPI_Waitall(reqnum, reqs, stats); - delete[] reqs; - delete[] stats; -#else - (void)ft; // unused -#endif -} - // IO Routines... bool am_really_master() { return (my_global_rank() == 0); } diff --git a/src/step.cpp b/src/step.cpp index 9d452d20b..dabe1df0d 100644 --- a/src/step.cpp +++ b/src/step.cpp @@ -15,6 +15,8 @@ % Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ +#include +#include #include #include #include @@ -133,65 +135,119 @@ void fields_chunk::phase_material(int phasein_time) { void fields::step_boundaries(field_type ft) { connect_chunks(); // re-connect if !chunk_connections_valid - // Do the metals first! - for (int i = 0; i < num_chunks; i++) - if (chunks[i]->is_mine()) chunks[i]->zero_metal(ft); + { + // Initiate receive operations as early as possible. + std::unique_ptr manager = create_comms_manager(); - /* Note that the copying of data to/from buffers is order-sensitive, - and must be kept consistent with the code in boundaries.cpp. - In particular, we require that boundaries.cpp set up the connections - array so that all of the connections for process i come before all - of the connections for process i' for i < i' */ + const auto &sequence = comms_sequence_for_field[ft]; + for (const comms_operation &op : sequence.receive_ops) { + if (chunks[op.other_chunk_idx]->is_mine()) { + continue; + } + manager->receive_real_async(comm_blocks[ft][op.pair_idx], static_cast(op.transfer_size), + op.other_proc_id, op.tag); + } - // First copy outgoing data to buffers... - am_now_working_on(Boundaries); - for (int j = 0; j < num_chunks; j++) - if (chunks[j]->is_mine()) { - int wh[3] = {0, 0, 0}; - for (int i = 0; i < num_chunks; i++) { - const int pair = j + i * num_chunks; - size_t n0 = 0; - for (int ip = 0; ip < 3; ip++) { - for (size_t n = 0; n < comm_sizes[ft][ip][pair]; n++) - comm_blocks[ft][pair][n0 + n] = *(chunks[j]->connections[ft][ip][Outgoing][wh[ip]++]); - n0 += comm_sizes[ft][ip][pair]; + // Do the metals first! + for (int i = 0; i < num_chunks; i++) + if (chunks[i]->is_mine()) chunks[i]->zero_metal(ft); + + /* Note that the copying of data to/from buffers is order-sensitive, + and must be kept consistent with the code in boundaries.cpp. + In particular, we require that boundaries.cpp set up the connections + array so that all of the connections for process i come before all + of the connections for process i' for i < i' */ + + // Copy outgoing data into buffers while following the predefined sequence of comms operations. + // Trigger the asynchronous send immediately once the outgoing comms buffer has been filled. + using offset_array = std::array; + std::map connection_offset_by_chunk; + am_now_working_on(Boundaries); + + for (const comms_operation &op : sequence.send_ops) { + if (!connection_offset_by_chunk.count(op.my_chunk_idx)) { + connection_offset_by_chunk.emplace(std::make_pair(op.my_chunk_idx, offset_array{})); + } + const std::pair comm_pair{op.my_chunk_idx, op.other_chunk_idx}; + const int pair_idx = op.pair_idx; + size_t n0 = 0; + + for (connect_phase ip : all_connect_phases) { + const size_t pair_comm_size = get_comm_size({ft, ip, comm_pair}); + ptrdiff_t &connection_offset = connection_offset_by_chunk[op.my_chunk_idx][ip]; + for (size_t n = 0; n < pair_comm_size; ++n) { + comm_blocks[ft][pair_idx][n0 + n] = + *(chunks[op.my_chunk_idx]->connections[ft][ip][Outgoing][connection_offset++]); } + n0 += pair_comm_size; + } + if (chunks[op.other_chunk_idx]->is_mine()) { + continue; } + manager->send_real_async(comm_blocks[ft][pair_idx], static_cast(op.transfer_size), + op.other_proc_id, op.tag); } - finished_working(); + finished_working(); - am_now_working_on(MpiOneTime); - boundary_communications(ft); + am_now_working_on(MpiOneTime); + // Let the communication manager drop out of scope to complete all outstanding requests. + } finished_working(); // Finally, copy incoming data to the fields themselves, multiplying phases: am_now_working_on(Boundaries); - for (int i = 0; i < num_chunks; i++) - if (chunks[i]->is_mine()) { - int wh[3] = {0, 0, 0}; - for (int j = 0; j < num_chunks; j++) { - const int pair = j + i * num_chunks; - connect_phase ip = CONNECT_PHASE; - for (size_t n = 0; n < comm_sizes[ft][ip][pair]; n += 2, wh[ip] += 2) { - const double phr = real(chunks[i]->connection_phases[ft][wh[ip] / 2]); - const double phi = imag(chunks[i]->connection_phases[ft][wh[ip] / 2]); - *(chunks[i]->connections[ft][ip][Incoming][wh[ip]]) = - phr * comm_blocks[ft][pair][n] - phi * comm_blocks[ft][pair][n + 1]; - *(chunks[i]->connections[ft][ip][Incoming][wh[ip] + 1]) = - phr * comm_blocks[ft][pair][n + 1] + phi * comm_blocks[ft][pair][n]; + for (int i = 0; i < num_chunks; i++) { + if (!chunks[i]->is_mine()) continue; + + ptrdiff_t connection_phase_offset = 0; + ptrdiff_t negate_phase_offset = 0; + ptrdiff_t copy_phase_offset = 0; + const std::complex *connection_phase_for_ft = chunks[i]->connection_phases[ft]; + + for (int j = 0; j < num_chunks; j++) { + const chunk_pair pair{j, i}; + const int pair_idx = chunk_pair_to_index(pair); + const realnum *pair_comm_block = static_cast(comm_blocks[ft][pair_idx]); + + { + const std::complex *pair_comm_block_complex = + reinterpret_cast *>(pair_comm_block); + const connect_phase ip = CONNECT_PHASE; + realnum **dst = chunks[i]->connections[ft][ip][Incoming]; + size_t num_transfers = get_comm_size({ft, ip, pair}) / 2; // Two realnums per complex + + for (size_t n = 0; n < num_transfers; ++n) { + std::complex temp = connection_phase_for_ft[connection_phase_offset + n] * pair_comm_block_complex[n]; + *(dst[2*(connection_phase_offset + n)]) = temp.real(); + *(dst[2*(connection_phase_offset + n)+1]) = temp.imag(); } - size_t n0 = comm_sizes[ft][ip][pair]; - ip = CONNECT_NEGATE; - for (size_t n = 0; n < comm_sizes[ft][ip][pair]; ++n) - *(chunks[i]->connections[ft][ip][Incoming][wh[ip]++]) = -comm_blocks[ft][pair][n0 + n]; - n0 += comm_sizes[ft][ip][pair]; - ip = CONNECT_COPY; - for (size_t n = 0; n < comm_sizes[ft][ip][pair]; ++n) - *(chunks[i]->connections[ft][ip][Incoming][wh[ip]++]) = comm_blocks[ft][pair][n0 + n]; + connection_phase_offset += num_transfers; + pair_comm_block += 2 * num_transfers; + } + + { + const connect_phase ip = CONNECT_NEGATE; + const size_t num_transfers = get_comm_size({ft, ip, pair}); + realnum **dst = chunks[i]->connections[ft][ip][Incoming]; + for (size_t n = 0; n < num_transfers; ++n) { + *(dst[negate_phase_offset + n]) = -pair_comm_block[n]; + } + negate_phase_offset += num_transfers; + pair_comm_block += num_transfers; + } + + { + connect_phase ip = CONNECT_COPY; + const size_t num_transfers = get_comm_size({ft, ip, pair}); + realnum **dst = chunks[i]->connections[ft][ip][Incoming]; + for (size_t n = 0; n < num_transfers; ++n) { + *(dst[copy_phase_offset + n]) = pair_comm_block[n]; + } + copy_phase_offset += num_transfers; } } + } finished_working(); - } void fields::step_source(field_type ft, bool including_integrated) {