Skip to content

Commit

Permalink
Update code to work with the latest versions of abseil and BS::thread…
Browse files Browse the repository at this point in the history
…_pool
  • Loading branch information
robomics committed May 6, 2024
1 parent 53d1d1b commit e73c0eb
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 194 deletions.
4 changes: 2 additions & 2 deletions src/contact_matrix/contact_matrix_dense_safe_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ ContactMatrixDense<double> ContactMatrixDense<N>::blur(const double sigma, const

const auto lck = this->lock();
if (tpool) {
auto fut = tpool->template parallelize_loop(usize(0), this->ncols(), apply_kernel);
auto fut = tpool->template submit_blocks(usize(0), this->ncols(), apply_kernel);
fut.wait();
} else {
apply_kernel(0, this->ncols());
Expand Down Expand Up @@ -220,7 +220,7 @@ ContactMatrixDense<double> ContactMatrixDense<N>::diff_of_gaussians(

const auto lck = this->lock();
if (tpool) {
auto fut = tpool->template parallelize_loop(usize(0), this->ncols(), compute_gauss_diff);
auto fut = tpool->template submit_blocks(usize(0), this->ncols(), compute_gauss_diff);
fut.wait();
} else {
compute_gauss_diff(0, this->ncols());
Expand Down
8 changes: 4 additions & 4 deletions src/libmodle/cpu/context_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,13 @@ inline usize ContextManager<Task>::num_io_threads() const noexcept {
template <typename Task>
template <typename TaskLambda>
inline void ContextManager<Task>::spawn_worker_thread(TaskLambda lambda) {
this->_futures.emplace_back(this->_worker_tpool.submit(lambda));
this->_futures.emplace_back(this->_worker_tpool.submit_task(lambda));
}

template <typename Task>
template <typename TaskLambda>
inline void ContextManager<Task>::spawn_io_thread(TaskLambda lambda) {
this->_futures.emplace_back(this->_io_tpool.submit(lambda));
this->_futures.emplace_back(this->_io_tpool.submit_task(lambda));
}

template <typename Task>
Expand All @@ -212,9 +212,9 @@ inline void ContextManager<Task>::shutdown() {
this->close_queue();

spdlog::debug(FMT_STRING("waiting for worker threads to return..."));
this->_worker_tpool.wait_for_tasks();
this->_worker_tpool.wait();
spdlog::debug(FMT_STRING("waiting for io threads to return..."));
this->_io_tpool.wait_for_tasks();
this->_io_tpool.wait();

spdlog::debug(
FMT_STRING("all background threads returned! Checking if any exception have been raised..."));
Expand Down
16 changes: 10 additions & 6 deletions src/libmodle/cpu/include/modle/simulation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class Simulation {
struct Task { // NOLINT(altera-struct-pack-align)
enum class Status : u8f { PENDING, RUNNING, COMPLETED, FAILED };
usize id{};
GenomicInterval* interval{};
const GenomicInterval* interval{};
GenomicIntervalData* interval_data{};
usize cell_id{};
usize num_target_epochs{};
usize num_target_contacts{};
Expand Down Expand Up @@ -336,18 +337,21 @@ class Simulation {
/// Register contacts for chromosome \p interval using the position the extrusion units of the
/// LEFs in \p lefs whose index is present in \p selected_lef_idx.
/// Return the actual number of contacts registered
usize register_contacts_loop(GenomicInterval& interval, absl::Span<const Lef> lefs,
usize num_sampling_events, random::PRNG_t& rand_eng) const;
usize register_contacts_loop(const GenomicInterval& interval, GenomicIntervalData& interval_data,
absl::Span<const Lef> lefs, usize num_sampling_events,
random::PRNG_t& rand_eng) const;
usize register_contacts_loop(bp_t start_pos, bp_t end_pos,
ContactMatrixDense<contacts_t>& contacts, absl::Span<const Lef> lefs,
usize num_sampling_events, random::PRNG_t& rand_eng) const;
usize register_contacts_tad(GenomicInterval& interval, absl::Span<const Lef> lefs,
usize num_sampling_events, random::PRNG_t& rand_eng) const;
usize register_contacts_tad(const GenomicInterval& interval, GenomicIntervalData& interval_data,
absl::Span<const Lef> lefs, usize num_sampling_events,
random::PRNG_t& rand_eng) const;
usize register_contacts_tad(bp_t start_pos, bp_t end_pos,
ContactMatrixDense<contacts_t>& contacts, absl::Span<const Lef> lefs,
usize num_sampling_events, random::PRNG_t& rand_eng) const;

usize register_1d_lef_occupancy(GenomicInterval& interval, absl::Span<const Lef> lefs,
usize register_1d_lef_occupancy(const GenomicInterval& interval,
GenomicIntervalData& interval_data, absl::Span<const Lef> lefs,
usize num_sampling_events, random::PRNG_t& rand_eng) const;

usize register_1d_lef_occupancy(bp_t start_pos, bp_t end_pos,
Expand Down
38 changes: 22 additions & 16 deletions src/libmodle/cpu/register_contacts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ void Simulation::sample_and_register_contacts(State& s, usize num_sampling_event
const auto num_tad_contacts = num_sampling_events - num_loop_contacts;

assert(s.interval);
s.num_contacts +=
this->register_contacts_loop(*s.interval, s.get_lefs(), num_loop_contacts, s.rand_eng);
s.num_contacts +=
this->register_contacts_tad(*s.interval, s.get_lefs(), num_tad_contacts, s.rand_eng);
s.num_contacts += this->register_contacts_loop(*s.interval, *s.interval_data, s.get_lefs(),
num_loop_contacts, s.rand_eng);
s.num_contacts += this->register_contacts_tad(*s.interval, *s.interval_data, s.get_lefs(),
num_tad_contacts, s.rand_eng);

if (c().track_1d_lef_position) {
this->register_1d_lef_occupancy(*s.interval, s.get_lefs(), num_sampling_events, s.rand_eng);
this->register_1d_lef_occupancy(*s.interval, *s.interval_data, s.get_lefs(),
num_sampling_events, s.rand_eng);
}

assert(s.num_contacts <= s.num_target_contacts);
Expand Down Expand Up @@ -233,26 +234,31 @@ usize Simulation::register_1d_lef_occupancy(bp_t start_pos, bp_t end_pos,
return num_successful_sampling_events;
}

usize Simulation::register_contacts_loop(GenomicInterval& interval,
usize Simulation::register_contacts_loop(const GenomicInterval& interval,
GenomicIntervalData& interval_data,
const absl::Span<const Lef> lefs,
usize num_sampling_events,
random::PRNG_t& rand_eng) const {
return this->register_contacts_loop(interval.start() + 1, interval.end() - 1, interval.contacts(),
lefs, num_sampling_events, rand_eng);
return this->register_contacts_loop(interval.start() + 1, interval.end() - 1,
interval_data.contacts(), lefs, num_sampling_events,
rand_eng);
}

usize Simulation::register_contacts_tad(GenomicInterval& interval, absl::Span<const Lef> lefs,
usize num_sampling_events, random::PRNG_t& rand_eng) const {
return this->register_contacts_tad(interval.start() + 1, interval.end() - 1, interval.contacts(),
lefs, num_sampling_events, rand_eng);
usize Simulation::register_contacts_tad(const GenomicInterval& interval,
GenomicIntervalData& interval_data,
absl::Span<const Lef> lefs, usize num_sampling_events,
random::PRNG_t& rand_eng) const {
return this->register_contacts_tad(interval.start() + 1, interval.end() - 1,
interval_data.contacts(), lefs, num_sampling_events, rand_eng);
}

usize Simulation::register_1d_lef_occupancy(GenomicInterval& interval, absl::Span<const Lef> lefs,
usize num_sampling_events,
usize Simulation::register_1d_lef_occupancy(const GenomicInterval& interval,
GenomicIntervalData& interval_data,
absl::Span<const Lef> lefs, usize num_sampling_events,
random::PRNG_t& rand_eng) const {
return this->register_1d_lef_occupancy(interval.start() + 1, interval.end() - 1,
interval.lef_1d_occupancy(), lefs, num_sampling_events,
rand_eng);
interval_data.lef_1d_occupancy(), lefs,
num_sampling_events, rand_eng);
}

} // namespace modle
7 changes: 4 additions & 3 deletions src/libmodle/cpu/scheduler_simulate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ void Simulation::run_simulate() {
std::unique_ptr<XXH3_state_t, utils::XXH3_Deleter> xxh_state{XXH3_createState()};

// Loop over chromosomes
for (auto& interval : this->_genome) {
for (auto& [interval, interval_data] : this->_genome) {
this->_ctx.check_exceptions();
auto sleep_time = std::chrono::microseconds(50);

auto rand_eng = random::PRNG(interval.hash(*xxh_state, c().seed));

// Don't bother simulating intervals without barriers
if (!c().simulate_chromosomes_wo_barriers && interval.num_barriers() == 0) {
if (!c().simulate_chromosomes_wo_barriers && interval_data.num_barriers() == 0) {
spdlog::info(FMT_STRING("{} has 0 barriers... SKIPPING!"), interval);
Task t{};
t.interval = &interval;
Expand All @@ -128,7 +128,7 @@ void Simulation::run_simulate() {
// Compute # of LEFs to be simulated based on interval.sizes
const auto nlefs = this->compute_num_lefs(interval.size());

const auto npixels = interval.npixels();
const auto npixels = interval_data.npixels();
const auto tot_target_contacts =
static_cast<usize>(std::round(static_cast<double>(npixels) * c().target_contact_density));

Expand All @@ -144,6 +144,7 @@ void Simulation::run_simulate() {

Task t{taskid++,
&interval,
&interval_data,
cellid,
c().target_simulation_epochs,
num_target_contacts,
Expand Down
61 changes: 34 additions & 27 deletions src/libmodle/cpu/simulation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ namespace modle {

static void override_extrusion_barrier_occupancy(Genome& genome, const double barrier_occupied_stp,
const double barrier_not_occupied_stp) {
for (auto& interval : genome) {
auto& barriers = interval.barriers();
for (auto& [interval, interval_data] : genome) {
auto& barriers = interval_data.barriers();
std::transform(barriers.begin(), barriers.end(), barriers.begin(), [&](const auto& barrier) {
return ExtrusionBarrier{barrier.pos, barrier_occupied_stp, barrier_not_occupied_stp,
barrier.blocking_direction.complement()};
Expand All @@ -60,7 +60,7 @@ static void override_extrusion_barrier_occupancy(Genome& genome, const double ba

static void issue_warnings_for_small_intervals(const Genome& genome, const bp_t diagonal_width) {
std::vector<std::string> warnings;
for (const auto& interval : genome) {
for (const auto& [interval, interval_data] : genome) {
if (interval.size() < diagonal_width) {
warnings.emplace_back(fmt::format(FMT_STRING("{}: {};"), interval, interval.size()));
}
Expand All @@ -75,8 +75,8 @@ static void issue_warnings_for_small_intervals(const Genome& genome, const bp_t
template <usize num_pixels_threshold = 250'000>
static void issue_warnings_for_small_matrices(const Genome& genome) {
std::vector<std::string> warnings;
for (const auto& interval : genome) {
if (const auto npixels = interval.npixels(); npixels < num_pixels_threshold) {
for (const auto& [interval, interval_data] : genome) {
if (const auto npixels = interval_data.npixels(); npixels < num_pixels_threshold) {
warnings.emplace_back(fmt::format(FMT_STRING("{}: {} pixels"), interval, npixels));
}
}
Expand Down Expand Up @@ -136,13 +136,13 @@ usize Simulation::simulated_size() const { return this->_genome.simulated_size()
return bwf;
}

static void write_contact_matrix_to_cooler(hictk::cooler::File& cf,
const GenomicInterval& interval) {
static void write_contact_matrix_to_cooler(hictk::cooler::File& cf, const GenomicInterval& interval,
const GenomicIntervalData& interval_data) {
if (!cf) {
return;
}

const auto& matrix = interval.contacts();
const auto& matrix = interval_data.contacts();
if (matrix.npixels() != 0) {
const auto t0 = absl::Now();
spdlog::info(FMT_STRING("[io] writing contacts for {} to file \"{}\"..."), interval, cf.uri());
Expand All @@ -165,21 +165,22 @@ static void write_contact_matrix_to_cooler(hictk::cooler::File& cf,
}

static void write_lef_occupancy_to_bwig(io::bigwig::Writer& bw, const GenomicInterval& interval,
const GenomicIntervalData& interval_data,
const bp_t resolution, std::vector<float>& buff) {
if (!bw || interval.npixels() == 0) {
if (!bw || interval_data.npixels() == 0) {
return;
}

try {
const auto t0 = absl::Now();
spdlog::info(FMT_STRING("[io]: writing 1D LEF occupancy profile for {} to file {}..."),
interval, bw.path());
buff.resize(interval.lef_1d_occupancy().size());
const auto max_element =
std::max_element(interval.lef_1d_occupancy().begin(), interval.lef_1d_occupancy().end())
->load();
buff.resize(interval_data.lef_1d_occupancy().size());
const auto max_element = std::max_element(interval_data.lef_1d_occupancy().begin(),
interval_data.lef_1d_occupancy().end())
->load();

std::transform(interval.lef_1d_occupancy().begin(), interval.lef_1d_occupancy().end(),
std::transform(interval_data.lef_1d_occupancy().begin(), interval_data.lef_1d_occupancy().end(),
buff.begin(), [&](const auto& n) {
return static_cast<float>(static_cast<double>(n.load()) /
static_cast<double>(max_element));
Expand Down Expand Up @@ -228,14 +229,15 @@ void Simulation::simulate_io(std::chrono::milliseconds wait_time) {
auto last_interval = this->_genome.end();

auto ctok = this->_ctx.register_consumer<Task::Status::COMPLETED>();
absl::btree_map<GenomicInterval*, usize> task_map{};
absl::btree_map<std::pair<const GenomicInterval*, GenomicIntervalData*>, usize> task_map{};
std::vector<float> bw_buff{};
while (!!this->_ctx && current_interval != last_interval) {
if (auto it = task_map.find(&(*current_interval)); it != task_map.end() && it->second == 0) {
const auto key = std::make_pair(&current_interval->first, &current_interval->second);
if (auto it = task_map.find(key); it != task_map.end() && it->second == 0) {
// All tasks for the current interval successfully completed!
write_contact_matrix_to_cooler(cf, *it->first);
write_lef_occupancy_to_bwig(bw, *it->first, c().bin_size, bw_buff);
it->first->deallocate();
write_contact_matrix_to_cooler(cf, *it->first.first, *it->first.second);
write_lef_occupancy_to_bwig(bw, *it->first.first, *it->first.second, c().bin_size, bw_buff);
it->first.second->deallocate();
task_map.erase(it);
++current_interval;
continue;
Expand All @@ -246,7 +248,8 @@ void Simulation::simulate_io(std::chrono::milliseconds wait_time) {
task = *task_opt;
// Add interval to task_map if not already present.
// Then decrement the number of tasks still pending.
auto [it, _] = task_map.try_emplace(task.interval, c().num_cells);
auto [it, _] =
task_map.try_emplace(std::make_pair(task.interval, task.interval_data), c().num_cells);
--it->second;
}
}
Expand Down Expand Up @@ -432,8 +435,10 @@ void Simulation::rank_lefs(const absl::Span<const Lef> lefs,
}

if (MODLE_LIKELY(ranks_are_partially_sorted)) {
cppsort::split_sort(rev_lef_rank_buff.begin(), rev_lef_rank_buff.end(), rev_comparator);
cppsort::split_sort(fwd_lef_rank_buff.begin(), fwd_lef_rank_buff.end(), fwd_comparator);
cppsort::split_adapter<cppsort::pdq_sorter>{}(rev_lef_rank_buff.begin(),
rev_lef_rank_buff.end(), rev_comparator);
cppsort::split_adapter<cppsort::pdq_sorter>{}(fwd_lef_rank_buff.begin(),
fwd_lef_rank_buff.end(), fwd_comparator);
} else {
// Fallback to pattern-defeating quicksort we have no information regarding the level of
// pre-sortedness of LEFs
Expand Down Expand Up @@ -747,13 +752,14 @@ Simulation::State& Simulation::State::operator=(const Task& task) {

this->id = task.id;
this->interval = task.interval;
this->interval_data = task.interval_data;
this->cell_id = task.cell_id;
this->num_target_epochs = task.num_target_epochs;
this->num_target_contacts = task.num_target_contacts;
this->num_lefs = task.num_lefs;
if (task.interval) {
this->barriers =
ExtrusionBarriers{task.interval->barriers().begin(), task.interval->barriers().end()};
if (task.interval_data) {
this->barriers = ExtrusionBarriers{task.interval_data->barriers().begin(),
task.interval_data->barriers().end()};
}
this->rand_eng = task.rand_eng;

Expand Down Expand Up @@ -1087,11 +1093,12 @@ usize Simulation::compute_num_lefs(const usize size_bp) const noexcept {

void Simulation::print_status_update(const Task& t) const noexcept {
assert(t.interval);
auto tot_target_epochs = this->compute_tot_target_epochs(t.num_lefs, t.interval->npixels());
assert(t.interval_data);
auto tot_target_epochs = this->compute_tot_target_epochs(t.num_lefs, t.interval_data->npixels());
spdlog::info(FMT_STRING("begin processing {}: simulating ~{} epochs across {} cells using {} "
"LEFs and {} barriers (~{} epochs per cell)..."),
*t.interval, tot_target_epochs, c().num_cells, t.num_lefs,
t.interval->barriers().size(), tot_target_epochs / c().num_cells);
t.interval_data->barriers().size(), tot_target_epochs / c().num_cells);
}

usize Simulation::compute_num_worker_threads(usize num_threads, usize num_intervals,
Expand Down
2 changes: 1 addition & 1 deletion src/libmodle/cpu/simulation_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ auto fmt::formatter<modle::Simulation::Task>::format(const modle::Simulation::Ta
assert(t.interval);
return fmt::format_to(ctx.out(), FMT_STRING("{}\t{}\t{}\t{}\t{}\t{}\t{}"), t.id,
t.interval->chrom(), t.cell_id, t.num_target_epochs, t.num_target_contacts,
t.num_lefs, !!t.interval ? t.interval->barriers().size() : 0);
t.num_lefs, !!t.interval_data ? t.interval_data->barriers().size() : 0);
}

constexpr auto fmt::formatter<modle::Simulation::State>::parse(format_parse_context& ctx)
Expand Down
Loading

0 comments on commit e73c0eb

Please sign in to comment.