Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamp analyses to investigate the impact of overlaps with executors #26

Merged
merged 3 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion lib/utils/io_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,24 @@ namespace Impl {
auto key = d.first;
auto value = d.second;
if(index) file << key << separator;


for(std::size_t i=0; i<value.size(); i++) {
if(i != value.size()-1) {
file << value[i] << separator;
} else {
file << value[i] << std::endl;
}
}

/*
for(auto v: value) {
if(v != value.back()) {
file << v << separator;
} else {
file << v << std::endl;
}
}
*/
}
}

Expand Down
1 change: 1 addition & 0 deletions mini-apps/lbm2d-letkf/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct Settings {
bool is_reference_ = true; // false for DA cases
bool is_async_ = false; // In order to enable overlapping, in senders/receivers version of letkf
bool is_bcast_on_host_ = false; // broadcast on device or host
bool use_time_stamps_ = false; // for detailed analysis
double ly_epsilon_ = 1.e-8;

// data assimilation parameter
Expand Down
221 changes: 56 additions & 165 deletions mini-apps/lbm2d-letkf/executors/letkf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ class LETKF : public DA_Model {
auto io_scheduler = io_thread_pool.get_scheduler();
auto _load = stdexec::just() |
stdexec::then([&]{
timers[DA_Load]->begin();
if(mpi_conf_.is_master()) {
timers[DA_Load]->begin();
load(data_vars, it);
timers[DA_Load]->end();
}
timers[DA_Load]->end();
});

timers[TimerEnum::DA]->begin();
Expand All @@ -134,17 +134,17 @@ class LETKF : public DA_Model {
auto xk_buffer = xk_buffer_.mdspan();
auto X = letkf_solver_->X().mdspan();

timers[DA_Set_Matrix]->begin();
timers[DA_Pack_X]->begin();
Impl::transpose(blas_handle_, f, xk, {2, 0, 1}); // (nx, ny, Q) -> (Q, nx*ny)
timers[DA_Set_Matrix]->end();
timers[DA_Pack_X]->end();

timers[DA_All2All]->begin();
timers[DA_All2All_X]->begin();
all2all(xk, xk_buffer); // xk(n_stt, n_batch, n_ens) -> xk_buffer(n_stt, n_batch, n_ens)
timers[DA_All2All]->end();
timers[DA_All2All_X]->end();

timers[DA_Set_Matrix]->begin();
timers[DA_Unpack_X]->begin();
Impl::transpose(blas_handle_, xk_buffer, X, {0, 2, 1});
timers[DA_Set_Matrix]->end();
timers[DA_Unpack_X]->end();

// set Y
auto yk = yk_.mdspan();
Expand All @@ -159,57 +159,42 @@ class LETKF : public DA_Model {
const int y_offset0 = 0;
auto _yk = Impl::reshape(yk, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny}));
Iterate_policy<4> yk_pack_policy4d({0, 0, 0, 0}, {n_obs_x_, n_obs_x_, nx, ny});
timers[DA_Set_Matrix]->begin();
timers[DA_Pack_Y]->begin();
Impl::for_each(yk_pack_policy4d, pack_y_functor(conf_, y_offset0, rho, u, v, _yk));
timers[DA_Set_Matrix]->end();
timers[DA_Pack_Y]->end();

timers[DA_All2All]->begin();
timers[DA_All2All_Y]->begin();
all2all(yk, yk_buffer); // yk(n_obs, n_batch, n_ens) -> yk_buffer(n_obs, n_batch, n_ens)
timers[DA_All2All]->end();
timers[DA_All2All_Y]->end();

timers[DA_Set_Matrix]->begin();
timers[DA_Unpack_Y]->begin();
Impl::transpose(blas_handle_, yk_buffer, Y, {0, 2, 1}); // (n_obs, n_batch, n_ens) -> (n_obs, n_ens, n_batch)
timers[DA_Set_Matrix]->end();
timers[DA_Unpack_Y]->end();

stdexec::sync_wait( scope.on_empty() );

auto _axpy = letkf_solver_->solve_axpy_sender(scheduler);
if(mpi_conf_.is_master()) {
if(!load_to_device_) {
if(!load_to_device_) {
timers[DA_Load_H2D]->begin();
if(mpi_conf_.is_master()) {
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
}
timers[DA_Load_H2D]->end();
}

// set yo
auto _broadcast = stdexec::just() |
stdexec::then([&]{
if(load_to_device_) {
auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();
} else {
auto rho_obs = data_vars->rho_obs().host_mdspan();
auto u_obs = data_vars->u_obs().host_mdspan();
auto v_obs = data_vars->v_obs().host_mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();

timers[DA_Load_H2D]->begin();
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
timers[DA_Load_H2D]->end();
}
auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();
});

auto _axpy_and_braodcast = stdexec::when_all(
Expand All @@ -231,36 +216,6 @@ class LETKF : public DA_Model {
timers[TimerEnum::DA]->end();
}

void packX(std::unique_ptr<DataVars>& data_vars, std::vector<Timer*>& timers) {
// Pack X
const auto f = data_vars->f().mdspan();
auto xk = xk_.mdspan();

timers[DA_Set_Matrix]->begin();
Impl::transpose(blas_handle_, f, xk, {2, 0, 1});
timers[DA_Set_Matrix]->end();
}

void unpackX(std::unique_ptr<DataVars>& data_vars, std::vector<Timer*>& timers) {
// set X
auto xk_buffer = xk_buffer_.mdspan();
auto X = letkf_solver_->X().mdspan();

timers[DA_Set_Matrix]->begin();
Impl::transpose(blas_handle_, xk_buffer, X, {0, 2, 1});
timers[DA_Set_Matrix]->end();
}

void unpackY(std::unique_ptr<DataVars>& data_vars, std::vector<Timer*>& timers) {
// set Y
auto yk_buffer = yk_buffer_.mdspan();
auto Y = letkf_solver_->Y().mdspan();

timers[DA_Set_Matrix]->begin();
Impl::transpose(blas_handle_, yk_buffer, Y, {0, 2, 1}); // (n_obs, n_batch, n_ens) -> (n_obs, n_ens, n_batch)
timers[DA_Set_Matrix]->end();
}

void setyo(std::unique_ptr<DataVars>& data_vars, std::vector<Timer*>& timers) {
// set yo
auto [nx, ny] = conf_.settings_.n_;
Expand All @@ -274,73 +229,20 @@ class LETKF : public DA_Model {
auto _y_obs = Impl::reshape(y_obs, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny_local}));
Iterate_policy<4> yo_pack_policy4d({0, 0, 0, 0}, {n_obs_x_, n_obs_x_, nx, ny_local});

timers[DA_Set_Matrix]->begin();
timers[DA_Pack_Obs]->begin();
Impl::for_each(yo_pack_policy4d, pack_y_functor(conf_, y_offset, rho_obs, u_obs, v_obs, _y_obs));
timers[DA_Set_Matrix]->end();
}

template <class Sender, class Scheduler>
stdexec::sender auto packY_sender(Sender&& sender, Scheduler&& scheduler, std::unique_ptr<DataVars>& data_vars) {
// Pack Y
auto yk = yk_.mdspan();

auto [nx, ny] = conf_.settings_.n_;
auto rho = data_vars->rho().mdspan();
auto u = data_vars->u().mdspan();
auto v = data_vars->v().mdspan();

const int y_offset0 = 0;
const std::size_t size = n_obs_x_ * n_obs_x_ * nx * ny;
auto _yk = Impl::reshape(yk, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny}));
auto f = pack_y_functor(conf_, y_offset0, rho, u, v, _yk);
int n0 = n_obs_x_, n1 = n_obs_x_, n2 = nx, n3 = ny;
auto functor_1d = [=] MDSPAN_FORCE_INLINE_FUNCTION (const int idx) {
if(std::is_same_v<default_iterate_layout, stdex::layout_left>) {
const int i0 = idx % n0;
const int i123 = idx / n0;
const int i1 = i123%n1;
const int i23 = i123/n1;
const int i2 = i23%n2;
const int i3 = i23/n2;
f(i0, i1, i2, i3);
} else {
const int i3 = idx % n3;
const int i012 = idx / n3;
const int i2 = i012%n2;
const int i01 = i012/n2;
const int i1 = i01%n1;
const int i0 = i01/n1;
f(i0, i1, i2, i3);
}
};
return sender | exec::on(scheduler, stdexec::bulk(size, functor_1d));
}

template <class Sender>
stdexec::sender auto all2all_sender(Sender&& sender, std::unique_ptr<DataVars>& data_vars) {
auto xk = xk_.mdspan();
auto xk_buffer = xk_buffer_.mdspan();

auto yk = yk_.mdspan();
auto yk_buffer = yk_buffer_.mdspan();

return sender | stdexec::then( [&] {
all2all(xk, xk_buffer); // xk(n_stt, n_batch, n_ens) -> xk_buffer(n_stt, n_batch, n_ens)
all2all(yk, yk_buffer); // yk(n_obs, n_batch, n_ens) -> yk_buffer(n_obs, n_batch, n_ens)
});
timers[DA_Pack_Obs]->end();
}

private:
// Conventional implementation with thrust
void apply_sync(std::unique_ptr<DataVars>& data_vars, const int it, std::vector<Timer*>& timers) {
timers[TimerEnum::DA]->begin();
timers[DA_Load]->begin();
if(mpi_conf_.is_master()) {
std::cout << __PRETTY_FUNCTION__ << ": t=" << it << std::endl;

timers[DA_Load]->begin();
load(data_vars, it);
timers[DA_Load]->end();
}
timers[DA_Load]->end();
setXandY(data_vars, timers);

timers[DA_LETKF]->begin();
Expand All @@ -366,17 +268,17 @@ class LETKF : public DA_Model {
auto xk_buffer = xk_buffer_.mdspan();
auto X = letkf_solver_->X().mdspan();

timers[DA_Set_Matrix]->begin();
timers[DA_Pack_X]->begin();
Impl::transpose(blas_handle_, f, xk, {2, 0, 1}); // (nx, ny, Q) -> (Q, nx*ny)
timers[DA_Set_Matrix]->end();
timers[DA_Pack_X]->end();

timers[DA_All2All]->begin();
timers[DA_All2All_X]->begin();
all2all(xk, xk_buffer); // xk(n_stt, n_batch, n_ens) -> xk_buffer(n_stt, n_batch, n_ens)
timers[DA_All2All]->end();
timers[DA_All2All_X]->end();

timers[DA_Set_Matrix]->begin();
timers[DA_Unpack_X]->begin();
Impl::transpose(blas_handle_, xk_buffer, X, {0, 2, 1});
timers[DA_Set_Matrix]->end();
timers[DA_Unpack_X]->end();

// set Y
auto yk = yk_.mdspan();
Expand All @@ -391,58 +293,47 @@ class LETKF : public DA_Model {
const int y_offset0 = 0;
auto _yk = Impl::reshape(yk, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny}));
Iterate_policy<4> yk_pack_policy4d({0, 0, 0, 0}, {n_obs_x_, n_obs_x_, nx, ny});
timers[DA_Set_Matrix]->begin();
timers[DA_Pack_Y]->begin();
Impl::for_each(yk_pack_policy4d, pack_y_functor(conf_, y_offset0, rho, u, v, _yk));
timers[DA_Set_Matrix]->end();
timers[DA_Pack_Y]->end();

timers[DA_All2All]->begin();
timers[DA_All2All_Y]->begin();
all2all(yk, yk_buffer); // yk(n_obs, n_batch, n_ens) -> yk_buffer(n_obs, n_batch, n_ens)
timers[DA_All2All]->end();
timers[DA_All2All_Y]->end();

timers[DA_Set_Matrix]->begin();
timers[DA_Unpack_Y]->begin();
Impl::transpose(blas_handle_, yk_buffer, Y, {0, 2, 1}); // (n_obs, n_batch, n_ens) -> (n_obs, n_ens, n_batch)
timers[DA_Set_Matrix]->end();
timers[DA_Unpack_Y]->end();

// set yo
if(load_to_device_) {
auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();
} else {
auto rho_obs = data_vars->rho_obs().host_mdspan();
auto u_obs = data_vars->u_obs().host_mdspan();
auto v_obs = data_vars->v_obs().host_mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();

if(!load_to_device_) {
timers[DA_Load_H2D]->begin();
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
if(mpi_conf_.is_master()) {
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
}
timers[DA_Load_H2D]->end();
}

auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
auto y_obs = letkf_solver_->y_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();

const int ny_local = ny/mpi_conf_.size();
const int y_offset = ny_local * mpi_conf_.rank();
auto y_obs = letkf_solver_->y_obs().mdspan();
auto _y_obs = Impl::reshape(y_obs, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny_local}));
Iterate_policy<4> yo_pack_policy4d({0, 0, 0, 0}, {n_obs_x_, n_obs_x_, nx, ny_local});

timers[DA_Set_Matrix]->begin();
timers[DA_Pack_Obs]->begin();
Impl::for_each(yo_pack_policy4d, pack_y_functor(conf_, y_offset, rho_obs, u_obs, v_obs, _y_obs));
timers[DA_Set_Matrix]->end();
timers[DA_Pack_Obs]->end();
}

void update(std::unique_ptr<DataVars>& data_vars) {
Expand Down
Loading