From aa54d61bb35000f1a23fdea2fc74b2b0d313bbe0 Mon Sep 17 00:00:00 2001 From: Steve Malton Date: Thu, 29 Jun 2023 13:49:24 +0100 Subject: [PATCH 1/9] Separate constructors for pair_list and pair_generation methods --- dorado/cli/duplex.cpp | 13 +++++----- dorado/read_pipeline/PairingNode.cpp | 37 +++++++++++++++------------- dorado/read_pipeline/PairingNode.h | 6 ++++- tests/PairingNodeTest.cpp | 4 +-- 4 files changed, 34 insertions(+), 26 deletions(-) diff --git a/dorado/cli/duplex.cpp b/dorado/cli/duplex.cpp index f5d7a352c..732e6afbc 100644 --- a/dorado/cli/duplex.cpp +++ b/dorado/cli/duplex.cpp @@ -369,10 +369,11 @@ int duplex(int argc, char* argv[]) { StereoDuplexEncoderNode stereo_node = StereoDuplexEncoderNode(*stereo_basecaller_node, simplex_model_stride); - PairingNode pairing_node(stereo_node, - template_complement_map.empty() - ? std::optional>{} - : template_complement_map); + std::unique_ptr pairing_node = + template_complement_map.empty() + ? std::make_unique(stereo_node) + : std::make_unique(stereo_node, + std::move(template_complement_map)); // Initialize duplex split settings and create a duplex split node // with the given settings and number of devices. If @@ -380,7 +381,7 @@ int duplex(int argc, char* argv[]) { // act as a passthrough, meaning it won't perform any splitting // operations and will just pass data through. DuplexSplitSettings splitter_settings; - DuplexSplitNode splitter_node(pairing_node, splitter_settings, num_devices); + DuplexSplitNode splitter_node(*pairing_node, splitter_settings, num_devices); auto adjusted_simplex_overlap = (overlap / simplex_model_stride) * simplex_model_stride; @@ -398,7 +399,7 @@ int duplex(int argc, char* argv[]) { using dorado::stats::make_stats_reporter; stats_reporters.push_back(make_stats_reporter(*stereo_basecaller_node)); stats_reporters.push_back(make_stats_reporter(stereo_node)); - stats_reporters.push_back(make_stats_reporter(pairing_node)); + stats_reporters.push_back(make_stats_reporter(*pairing_node)); stats_reporters.push_back(make_stats_reporter(splitter_node)); stats_reporters.push_back(make_stats_reporter(*basecaller_node)); stats_reporters.push_back(make_stats_reporter(loader)); diff --git a/dorado/read_pipeline/PairingNode.cpp b/dorado/read_pipeline/PairingNode.cpp index 78398d0ee..0ac8cb7bd 100644 --- a/dorado/read_pipeline/PairingNode.cpp +++ b/dorado/read_pipeline/PairingNode.cpp @@ -174,26 +174,29 @@ void PairingNode::pair_generating_worker_thread() { } PairingNode::PairingNode(MessageSink& sink, - std::optional> template_complement_map, + std::map template_complement_map, int num_worker_threads, size_t max_reads) - : MessageSink(max_reads), m_sink(sink), m_num_worker_threads(num_worker_threads) { - if (template_complement_map.has_value()) { - m_template_complement_map = template_complement_map.value(); - // Set up the complement-template_map - for (auto& key : m_template_complement_map) { - m_complement_template_map[key.second] = key.first; - } + : MessageSink(max_reads), + m_sink(sink), + m_template_complement_map(std::move(template_complement_map)), + m_num_worker_threads(num_worker_threads) { + // Set up the complement-template_map + for (auto& key : m_template_complement_map) { + m_complement_template_map[key.second] = key.first; + } - for (size_t i = 0; i < m_num_worker_threads; i++) { - m_workers.push_back(std::make_unique( - std::thread(&PairingNode::pair_list_worker_thread, this))); - } - } else { - for (size_t i = 0; i < m_num_worker_threads; i++) { - m_workers.push_back(std::make_unique( - std::thread(&PairingNode::pair_generating_worker_thread, this))); - } + for (size_t i = 0; i < m_num_worker_threads; i++) { + m_workers.push_back(std::make_unique( + std::thread(&PairingNode::pair_list_worker_thread, this))); + } +} + +PairingNode::PairingNode(MessageSink& sink, int num_worker_threads, size_t max_reads) + : MessageSink(max_reads), m_sink(sink), m_num_worker_threads(num_worker_threads) { + for (size_t i = 0; i < m_num_worker_threads; i++) { + m_workers.push_back(std::make_unique( + std::thread(&PairingNode::pair_generating_worker_thread, this))); } } diff --git a/dorado/read_pipeline/PairingNode.h b/dorado/read_pipeline/PairingNode.h index 95cc76a41..87f487c7f 100644 --- a/dorado/read_pipeline/PairingNode.h +++ b/dorado/read_pipeline/PairingNode.h @@ -16,10 +16,14 @@ namespace dorado { class PairingNode : public MessageSink { public: + // Template-complement map: uses the pair_list pairing method PairingNode(MessageSink& sink, - std::optional> = std::nullopt, + std::map template_complement_map, int num_worker_threads = 2, size_t max_reads = 1000); + + // No template-complement map: uses the pair_generation pairing method + PairingNode(MessageSink& sink, int num_worker_threads = 2, size_t max_reads = 1000); ~PairingNode(); std::string get_name() const override { return "PairingNode"; } stats::NamedStats sample_stats() const override; diff --git a/tests/PairingNodeTest.cpp b/tests/PairingNodeTest.cpp index d948fff9a..c769d0b9e 100644 --- a/tests/PairingNodeTest.cpp +++ b/tests/PairingNodeTest.cpp @@ -44,8 +44,8 @@ TEST_CASE("Split read pairing", TEST_GROUP) { // clang-format on MessageSinkToVector sink(5); - dorado::PairingNode pairing_node(sink, std::nullopt, 1, - 1); // one thread, one read - force reads through in order + // one thread, one read - force reads through in order + dorado::PairingNode pairing_node(sink, 1, 1); for (auto& read : reads) { pairing_node.push_message(std::move(read)); } From c3a9f8fc1c879f787c4e88eb2c1080847bc5ce14 Mon Sep 17 00:00:00 2001 From: Steve Malton Date: Thu, 29 Jun 2023 13:49:48 +0100 Subject: [PATCH 2/9] Order by start_time_ms rather than string timestamp --- dorado/read_pipeline/PairingNode.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dorado/read_pipeline/PairingNode.cpp b/dorado/read_pipeline/PairingNode.cpp index 0ac8cb7bd..99c0f62d2 100644 --- a/dorado/read_pipeline/PairingNode.cpp +++ b/dorado/read_pipeline/PairingNode.cpp @@ -123,7 +123,7 @@ void PairingNode::pair_generating_worker_thread() { auto compare_reads_by_time = [](const std::shared_ptr& read1, const std::shared_ptr& read2) { - return read1->attributes.start_time < read2->attributes.start_time; + return read1->start_time_ms < read2->start_time_ms; }; if (channel_mux_read_map.count(key)) { From 52d8cd202ab6534cf68808ac9ae4181aac65704e Mon Sep 17 00:00:00 2001 From: Steve Malton Date: Thu, 29 Jun 2023 13:55:56 +0100 Subject: [PATCH 3/9] Reduce number of lookups in read pair generation --- dorado/read_pipeline/PairingNode.cpp | 40 +++++++++++++--------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/dorado/read_pipeline/PairingNode.cpp b/dorado/read_pipeline/PairingNode.cpp index 99c0f62d2..711124ae6 100644 --- a/dorado/read_pipeline/PairingNode.cpp +++ b/dorado/read_pipeline/PairingNode.cpp @@ -84,6 +84,11 @@ void PairingNode::pair_list_worker_thread() { } void PairingNode::pair_generating_worker_thread() { + auto compare_reads_by_time = [](const std::shared_ptr& read1, + const std::shared_ptr& read2) { + return read1->start_time_ms < read2->start_time_ms; + }; + Message message; while (m_work_queue.try_pop(message)) { // If this message isn't a read, we'll get a bad_variant_access exception. @@ -98,12 +103,15 @@ void PairingNode::pair_generating_worker_thread() { int max_num_keys = 10; std::unique_lock lock(m_pairing_mtx); UniquePoreIdentifierKey key = std::make_tuple(channel, mux, run_id, flowcell_id, client_id); - auto found = channel_mux_read_map.find(key); + auto read_list_iter = channel_mux_read_map.find(key); // Check if the key is already in the list - if (found == channel_mux_read_map.end()) { + if (read_list_iter == channel_mux_read_map.end()) { // Key is not in the dequeue + // Add the new key to the end of the list + m_working_channel_mux_keys.push_back(key); + channel_mux_read_map.insert({key, {read}}); - if (m_working_channel_mux_keys.size() >= max_num_keys) { + if (m_working_channel_mux_keys.size() > max_num_keys) { // Remove the oldest key (front of the list) auto oldest_key = m_working_channel_mux_keys.front(); m_working_channel_mux_keys.pop_front(); @@ -117,21 +125,12 @@ void PairingNode::pair_generating_worker_thread() { channel_mux_read_map.erase(oldest_key); assert(channel_mux_read_map.size() == m_working_channel_mux_keys.size()); } - // Add the new key to the end of the list - m_working_channel_mux_keys.push_back(key); - } - - auto compare_reads_by_time = [](const std::shared_ptr& read1, - const std::shared_ptr& read2) { - return read1->start_time_ms < read2->start_time_ms; - }; - - if (channel_mux_read_map.count(key)) { - auto later_read = - std::lower_bound(channel_mux_read_map[key].begin(), - channel_mux_read_map[key].end(), read, compare_reads_by_time); + } else { + auto& cached_read_list = read_list_iter->second; + auto later_read = std::lower_bound(cached_read_list.begin(), cached_read_list.end(), + read, compare_reads_by_time); - if (later_read != channel_mux_read_map[key].begin()) { + if (later_read != cached_read_list.begin()) { auto earlier_read = std::prev(later_read); if (is_within_time_and_length_criteria(*earlier_read, read)) { @@ -141,7 +140,7 @@ void PairingNode::pair_generating_worker_thread() { } } - if (later_read != channel_mux_read_map[key].end()) { + if (later_read != cached_read_list.end()) { if (is_within_time_and_length_criteria(read, *later_read)) { ReadPair pair = {read, *later_read}; ++read->num_duplex_candidate_pairs; @@ -149,10 +148,7 @@ void PairingNode::pair_generating_worker_thread() { } } - channel_mux_read_map[key].insert(later_read, read); - - } else { - channel_mux_read_map[key].push_back(read); + cached_read_list.insert(later_read, read); } } if (--m_num_worker_threads == 0) { From 4003fb09a8c412b1edd6891e4921e664e5867651 Mon Sep 17 00:00:00 2001 From: Steve Malton Date: Thu, 29 Jun 2023 14:20:11 +0100 Subject: [PATCH 4/9] Add expected read ordering parameter Cache strategy changes depending on whether we expect reads to be in pore_order (where we need to keep all the reads for a few pores) or in time_order (where we can keep only a few reads, but for every pore) --- dorado/read_pipeline/PairingNode.cpp | 25 ++++++++++++++++++++----- dorado/read_pipeline/PairingNode.h | 9 +++++++-- tests/PairingNodeTest.cpp | 2 +- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/dorado/read_pipeline/PairingNode.cpp b/dorado/read_pipeline/PairingNode.cpp index 711124ae6..173b24fe2 100644 --- a/dorado/read_pipeline/PairingNode.cpp +++ b/dorado/read_pipeline/PairingNode.cpp @@ -83,7 +83,7 @@ void PairingNode::pair_list_worker_thread() { } } -void PairingNode::pair_generating_worker_thread() { +void PairingNode::pair_generating_worker_thread(size_t max_num_keys, size_t max_num_reads) { auto compare_reads_by_time = [](const std::shared_ptr& read1, const std::shared_ptr& read2) { return read1->start_time_ms < read2->start_time_ms; @@ -100,7 +100,6 @@ void PairingNode::pair_generating_worker_thread() { std::string flowcell_id = read->flowcell_id; int32_t client_id = read->client_id; - int max_num_keys = 10; std::unique_lock lock(m_pairing_mtx); UniquePoreIdentifierKey key = std::make_tuple(channel, mux, run_id, flowcell_id, client_id); auto read_list_iter = channel_mux_read_map.find(key); @@ -149,8 +148,12 @@ void PairingNode::pair_generating_worker_thread() { } cached_read_list.insert(later_read, read); + while (cached_read_list.size() > max_num_reads) { + cached_read_list.pop_front(); + } } } + if (--m_num_worker_threads == 0) { std::unique_lock lock(m_pairing_mtx); // There are still reads in channel_mux_read_map. Push them to the sink. @@ -188,11 +191,23 @@ PairingNode::PairingNode(MessageSink& sink, } } -PairingNode::PairingNode(MessageSink& sink, int num_worker_threads, size_t max_reads) +PairingNode::PairingNode(MessageSink& sink, + ReadOrder read_order, + int num_worker_threads, + size_t max_reads) : MessageSink(max_reads), m_sink(sink), m_num_worker_threads(num_worker_threads) { + size_t max_num_keys = std::numeric_limits::max(); + size_t max_num_reads = std::numeric_limits::max(); + + if (read_order == ReadOrder::pore_order) { + max_num_keys = 10; + } else if (read_order == ReadOrder::time_order) { + max_num_reads = 10; + } + for (size_t i = 0; i < m_num_worker_threads; i++) { - m_workers.push_back(std::make_unique( - std::thread(&PairingNode::pair_generating_worker_thread, this))); + m_workers.push_back(std::make_unique(std::thread( + &PairingNode::pair_generating_worker_thread, this, max_num_keys, max_num_reads))); } } diff --git a/dorado/read_pipeline/PairingNode.h b/dorado/read_pipeline/PairingNode.h index 87f487c7f..fc91bd82d 100644 --- a/dorado/read_pipeline/PairingNode.h +++ b/dorado/read_pipeline/PairingNode.h @@ -16,6 +16,8 @@ namespace dorado { class PairingNode : public MessageSink { public: + enum class ReadOrder { pore_order = 0, time_order }; + // Template-complement map: uses the pair_list pairing method PairingNode(MessageSink& sink, std::map template_complement_map, @@ -23,14 +25,17 @@ class PairingNode : public MessageSink { size_t max_reads = 1000); // No template-complement map: uses the pair_generation pairing method - PairingNode(MessageSink& sink, int num_worker_threads = 2, size_t max_reads = 1000); + PairingNode(MessageSink& sink, + ReadOrder read_order = ReadOrder::pore_order, + int num_worker_threads = 2, + size_t max_reads = 1000); ~PairingNode(); std::string get_name() const override { return "PairingNode"; } stats::NamedStats sample_stats() const override; private: void pair_list_worker_thread(); - void pair_generating_worker_thread(); + void pair_generating_worker_thread(size_t max_num_keys, size_t max_num_reads); // A key for a unique Pore, Duplex reads must have the same UniquePoreIdentifierKey // The values are channel, mux, run_id, flowcell_id, client_id diff --git a/tests/PairingNodeTest.cpp b/tests/PairingNodeTest.cpp index c769d0b9e..6f87b795a 100644 --- a/tests/PairingNodeTest.cpp +++ b/tests/PairingNodeTest.cpp @@ -45,7 +45,7 @@ TEST_CASE("Split read pairing", TEST_GROUP) { MessageSinkToVector sink(5); // one thread, one read - force reads through in order - dorado::PairingNode pairing_node(sink, 1, 1); + dorado::PairingNode pairing_node(sink, dorado::PairingNode::ReadOrder::pore_order, 1, 1); for (auto& read : reads) { pairing_node.push_message(std::move(read)); } From bba650a97ae061d6f0c5c7a6b6ae69cc757235db Mon Sep 17 00:00:00 2001 From: Steve Malton Date: Fri, 30 Jun 2023 15:09:43 +0100 Subject: [PATCH 5/9] Fix lock being held unnecessarily --- dorado/read_pipeline/PairingNode.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dorado/read_pipeline/PairingNode.cpp b/dorado/read_pipeline/PairingNode.cpp index 173b24fe2..4c6babb8a 100644 --- a/dorado/read_pipeline/PairingNode.cpp +++ b/dorado/read_pipeline/PairingNode.cpp @@ -36,6 +36,7 @@ void PairingNode::pair_list_worker_thread() { partner_found = true; } else { { + tc_lock.unlock(); std::lock_guard ct_lock(m_ct_map_mutex); auto it = m_complement_template_map.find(read->read_id); if (it != m_complement_template_map.end()) { From d852cd854c7983aac29e85c2d91cf5fbd3958946 Mon Sep 17 00:00:00 2001 From: Steve Malton Date: Fri, 30 Jun 2023 15:12:39 +0100 Subject: [PATCH 6/9] Rename and reorder variables --- dorado/read_pipeline/PairingNode.cpp | 26 +++++++++++++------------- dorado/read_pipeline/PairingNode.h | 18 ++++++++++-------- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/dorado/read_pipeline/PairingNode.cpp b/dorado/read_pipeline/PairingNode.cpp index 4c6babb8a..29eb85666 100644 --- a/dorado/read_pipeline/PairingNode.cpp +++ b/dorado/read_pipeline/PairingNode.cpp @@ -48,14 +48,14 @@ void PairingNode::pair_list_worker_thread() { if (partner_found) { std::unique_lock read_cache_lock(m_read_cache_mutex); - if (read_cache.find(partner_id) == read_cache.end()) { + auto partner_read_itr = m_read_cache.find(partner_id); + if (partner_read_itr == m_read_cache.end()) { // Partner is not in the read cache - read_cache[read->read_id] = read; + m_read_cache.insert({read->read_id, read}); read_cache_lock.unlock(); } else { - auto partner_read_itr = read_cache.find(partner_id); auto partner_read = partner_read_itr->second; - read_cache.erase(partner_read_itr); + m_read_cache.erase(partner_read_itr); read_cache_lock.unlock(); std::shared_ptr template_read; @@ -103,27 +103,27 @@ void PairingNode::pair_generating_worker_thread(size_t max_num_keys, size_t max_ std::unique_lock lock(m_pairing_mtx); UniquePoreIdentifierKey key = std::make_tuple(channel, mux, run_id, flowcell_id, client_id); - auto read_list_iter = channel_mux_read_map.find(key); + auto read_list_iter = m_channel_mux_read_map.find(key); // Check if the key is already in the list - if (read_list_iter == channel_mux_read_map.end()) { + if (read_list_iter == m_channel_mux_read_map.end()) { // Key is not in the dequeue // Add the new key to the end of the list m_working_channel_mux_keys.push_back(key); - channel_mux_read_map.insert({key, {read}}); + m_channel_mux_read_map.insert({key, {read}}); if (m_working_channel_mux_keys.size() > max_num_keys) { // Remove the oldest key (front of the list) auto oldest_key = m_working_channel_mux_keys.front(); m_working_channel_mux_keys.pop_front(); - auto oldest_key_it = channel_mux_read_map.find(oldest_key); + auto oldest_key_it = m_channel_mux_read_map.find(oldest_key); // Remove the oldest key from the map for (auto read_ptr : oldest_key_it->second) { m_sink.push_message(read_ptr); } - channel_mux_read_map.erase(oldest_key); - assert(channel_mux_read_map.size() == m_working_channel_mux_keys.size()); + m_channel_mux_read_map.erase(oldest_key); + assert(m_channel_mux_read_map.size() == m_working_channel_mux_keys.size()); } } else { auto& cached_read_list = read_list_iter->second; @@ -159,7 +159,7 @@ void PairingNode::pair_generating_worker_thread(size_t max_num_keys, size_t max_ std::unique_lock lock(m_pairing_mtx); // There are still reads in channel_mux_read_map. Push them to the sink. // Last thread alive is responsible for cleaning up the cache. - for (const auto& kv : channel_mux_read_map) { + for (const auto& kv : m_channel_mux_read_map) { // kv is a std::pair>> const auto& reads_list = kv.second; @@ -179,8 +179,8 @@ PairingNode::PairingNode(MessageSink& sink, size_t max_reads) : MessageSink(max_reads), m_sink(sink), - m_template_complement_map(std::move(template_complement_map)), - m_num_worker_threads(num_worker_threads) { + m_num_worker_threads(num_worker_threads), + m_template_complement_map(std::move(template_complement_map)) { // Set up the complement-template_map for (auto& key : m_template_complement_map) { m_complement_template_map[key.second] = key.first; diff --git a/dorado/read_pipeline/PairingNode.h b/dorado/read_pipeline/PairingNode.h index fc91bd82d..9840a0cff 100644 --- a/dorado/read_pipeline/PairingNode.h +++ b/dorado/read_pipeline/PairingNode.h @@ -41,24 +41,26 @@ class PairingNode : public MessageSink { // The values are channel, mux, run_id, flowcell_id, client_id using UniquePoreIdentifierKey = std::tuple; - std::vector> m_workers; MessageSink& m_sink; - std::map m_template_complement_map; - std::map m_complement_template_map; + std::vector> m_workers; + std::atomic m_num_worker_threads; + + // Members for pair_list method std::mutex m_tc_map_mutex; std::mutex m_ct_map_mutex; std::mutex m_read_cache_mutex; - std::atomic m_num_worker_threads; + std::map m_template_complement_map; + std::map m_complement_template_map; + std::map> m_read_cache; - std::map> read_cache; + // Members for pair_generating method - std::map>> channel_mux_read_map; + std::mutex m_pairing_mtx; + std::map>> m_channel_mux_read_map; std::deque m_working_channel_mux_keys; - - std::mutex m_pairing_mtx; }; } // namespace dorado From cb9727fc138a31a3d8b43077ad1cfc54a4c9bc4e Mon Sep 17 00:00:00 2001 From: Steve Malton Date: Fri, 30 Jun 2023 15:13:13 +0100 Subject: [PATCH 7/9] Make cache sizes member variables --- dorado/read_pipeline/PairingNode.cpp | 23 ++++++++++++----------- dorado/read_pipeline/PairingNode.h | 4 +++- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/dorado/read_pipeline/PairingNode.cpp b/dorado/read_pipeline/PairingNode.cpp index 29eb85666..f95d3d0f5 100644 --- a/dorado/read_pipeline/PairingNode.cpp +++ b/dorado/read_pipeline/PairingNode.cpp @@ -84,7 +84,7 @@ void PairingNode::pair_list_worker_thread() { } } -void PairingNode::pair_generating_worker_thread(size_t max_num_keys, size_t max_num_reads) { +void PairingNode::pair_generating_worker_thread() { auto compare_reads_by_time = [](const std::shared_ptr& read1, const std::shared_ptr& read2) { return read1->start_time_ms < read2->start_time_ms; @@ -111,7 +111,7 @@ void PairingNode::pair_generating_worker_thread(size_t max_num_keys, size_t max_ m_working_channel_mux_keys.push_back(key); m_channel_mux_read_map.insert({key, {read}}); - if (m_working_channel_mux_keys.size() > max_num_keys) { + if (m_working_channel_mux_keys.size() > m_max_num_keys) { // Remove the oldest key (front of the list) auto oldest_key = m_working_channel_mux_keys.front(); m_working_channel_mux_keys.pop_front(); @@ -149,7 +149,7 @@ void PairingNode::pair_generating_worker_thread(size_t max_num_keys, size_t max_ } cached_read_list.insert(later_read, read); - while (cached_read_list.size() > max_num_reads) { + while (cached_read_list.size() > m_max_num_reads) { cached_read_list.pop_front(); } } @@ -196,19 +196,20 @@ PairingNode::PairingNode(MessageSink& sink, ReadOrder read_order, int num_worker_threads, size_t max_reads) - : MessageSink(max_reads), m_sink(sink), m_num_worker_threads(num_worker_threads) { - size_t max_num_keys = std::numeric_limits::max(); - size_t max_num_reads = std::numeric_limits::max(); - + : MessageSink(max_reads), + m_sink(sink), + m_num_worker_threads(num_worker_threads), + m_max_num_keys(std::numeric_limits::max()), + m_max_num_reads(std::numeric_limits::max()) { if (read_order == ReadOrder::pore_order) { - max_num_keys = 10; + m_max_num_keys = 10; } else if (read_order == ReadOrder::time_order) { - max_num_reads = 10; + m_max_num_reads = 10; } for (size_t i = 0; i < m_num_worker_threads; i++) { - m_workers.push_back(std::make_unique(std::thread( - &PairingNode::pair_generating_worker_thread, this, max_num_keys, max_num_reads))); + m_workers.push_back(std::make_unique( + std::thread(&PairingNode::pair_generating_worker_thread, this))); } } diff --git a/dorado/read_pipeline/PairingNode.h b/dorado/read_pipeline/PairingNode.h index 9840a0cff..88f83a79e 100644 --- a/dorado/read_pipeline/PairingNode.h +++ b/dorado/read_pipeline/PairingNode.h @@ -35,7 +35,7 @@ class PairingNode : public MessageSink { private: void pair_list_worker_thread(); - void pair_generating_worker_thread(size_t max_num_keys, size_t max_num_reads); + void pair_generating_worker_thread(); // A key for a unique Pore, Duplex reads must have the same UniquePoreIdentifierKey // The values are channel, mux, run_id, flowcell_id, client_id @@ -61,6 +61,8 @@ class PairingNode : public MessageSink { std::map>> m_channel_mux_read_map; std::deque m_working_channel_mux_keys; + size_t m_max_num_keys; + size_t m_max_num_reads; }; } // namespace dorado From a31f8743916461e4fa024a195e9eadeca5596576 Mon Sep 17 00:00:00 2001 From: Steve Malton Date: Fri, 30 Jun 2023 15:22:18 +0100 Subject: [PATCH 8/9] Docs --- dorado/read_pipeline/PairingNode.h | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/dorado/read_pipeline/PairingNode.h b/dorado/read_pipeline/PairingNode.h index 88f83a79e..2a6d019c6 100644 --- a/dorado/read_pipeline/PairingNode.h +++ b/dorado/read_pipeline/PairingNode.h @@ -34,7 +34,21 @@ class PairingNode : public MessageSink { stats::NamedStats sample_stats() const override; private: + /** + * This is a worker thread function for pairing reads based on a specified list of template-complement pairs. + */ void pair_list_worker_thread(); + + /** + * This is a worker thread function for generating pairs of reads that fall within pairing criteria. + * + * The function goes through the incoming messages, which are expected to be reads. For each read, it finds its pore + * in the list of active pores. If the pore isn't in the list yet, it is added. If the list of active pores has reached + * its maximum size (m_max_num_keys), the oldest pore is removed from the list, and its associated reads are discarded. + * The function then inserts the new read into the sorted list of reads for its pore, and checks if it can be paired + * with the reads immediately before and after it in the list. If the list of reads for a pore has reached its maximum + * size (m_max_num_reads), the oldest read is removed from the list. + */ void pair_generating_worker_thread(); // A key for a unique Pore, Duplex reads must have the same UniquePoreIdentifierKey @@ -61,7 +75,21 @@ class PairingNode : public MessageSink { std::map>> m_channel_mux_read_map; std::deque m_working_channel_mux_keys; + + /** + * The maximum number of different channels (pores) to keep in memory concurrently. + * This parameter is crucial when reads are expected to be delivered in channel/pore order. In this order, + * once a read from a specific pore is processed, it is guaranteed that no other reads from that pore will appear. + * Thus, the function can limit memory usage by only keeping reads from a fixed number of pores (channels) in memory. + */ size_t m_max_num_keys; + + /** + * The maximum number of reads from a specific pore to keep in memory. This parameter is + * crucial when reads are expected to be delivered in time order. In this order, reads from the same pore could + * appear at any point in the stream. Thus, the function keeps a limited history of reads for each pore in memory. + * It ensures that the memory usage is controlled, while the reads needed for pairing are available. + */ size_t m_max_num_reads; }; From eb375aac8c443537e94faa33eed9190d07533b39 Mon Sep 17 00:00:00 2001 From: Steve Malton Date: Fri, 30 Jun 2023 15:47:54 +0100 Subject: [PATCH 9/9] Use same enum to set traversal order for dataloader and pairing node --- dorado/cli/duplex.cpp | 5 +++-- dorado/data_loader/DataLoader.cpp | 8 ++++---- dorado/data_loader/DataLoader.h | 8 ++------ dorado/read_pipeline/PairingNode.cpp | 10 ++++++++-- dorado/read_pipeline/PairingNode.h | 5 ++--- dorado/utils/types.h | 15 +++++++++++++++ tests/DuplexSplitTest.cpp | 2 +- tests/PairingNodeTest.cpp | 2 +- tests/Pod5DataLoaderTest.cpp | 2 +- 9 files changed, 37 insertions(+), 20 deletions(-) diff --git a/dorado/cli/duplex.cpp b/dorado/cli/duplex.cpp index 732e6afbc..c98651d6d 100644 --- a/dorado/cli/duplex.cpp +++ b/dorado/cli/duplex.cpp @@ -28,6 +28,7 @@ #include "utils/models.h" #include "utils/parameters.h" +#include "utils/types.h" #include #include @@ -371,7 +372,7 @@ int duplex(int argc, char* argv[]) { std::unique_ptr pairing_node = template_complement_map.empty() - ? std::make_unique(stereo_node) + ? std::make_unique(stereo_node, ReadOrder::BY_CHANNEL) : std::make_unique(stereo_node, std::move(template_complement_map)); @@ -410,7 +411,7 @@ int duplex(int argc, char* argv[]) { kStatsPeriod, stats_reporters, stats_callables); // End stats counting setup. - loader.load_reads(reads, parser.get("--recursive"), DataLoader::BY_CHANNEL); + loader.load_reads(reads, parser.get("--recursive"), ReadOrder::BY_CHANNEL); bam_writer->join(); // Explicitly wait for all output rows to be written. stats_sampler->terminate(); } diff --git a/dorado/data_loader/DataLoader.cpp b/dorado/data_loader/DataLoader.cpp index c4404ae61..bf4592d08 100644 --- a/dorado/data_loader/DataLoader.cpp +++ b/dorado/data_loader/DataLoader.cpp @@ -164,7 +164,7 @@ void DataLoader::load_reads(const std::string& path, auto iterate_directory = [&](const auto& iterator_fn) { switch (traversal_order) { - case BY_CHANNEL: + case ReadOrder::BY_CHANNEL: // If traversal in channel order is required, the following algorithm // is used - // 1. iterate through all the read metadata to collect channel information @@ -199,7 +199,7 @@ void DataLoader::load_reads(const std::string& path, } } break; - case UNRESTRICTED: + case ReadOrder::UNRESTRICTED: for (const auto& entry : iterator_fn(path)) { if (m_loaded_read_count == m_max_reads) { break; @@ -215,8 +215,8 @@ void DataLoader::load_reads(const std::string& path, } break; default: - throw std::runtime_error("Unsupported traversal order detected " + - std::to_string(traversal_order)); + throw std::runtime_error("Unsupported traversal order detected: " + + dorado::to_string(traversal_order)); } }; diff --git a/dorado/data_loader/DataLoader.h b/dorado/data_loader/DataLoader.h index 653de3875..c09a72c5e 100644 --- a/dorado/data_loader/DataLoader.h +++ b/dorado/data_loader/DataLoader.h @@ -1,5 +1,6 @@ #pragma once #include "utils/stats.h" +#include "utils/types.h" #include #include @@ -28,11 +29,6 @@ using Pod5Ptr = std::unique_ptr; class DataLoader { public: - enum ReadOrder { - UNRESTRICTED, - BY_CHANNEL, - }; - DataLoader(MessageSink& read_sink, const std::string& device, size_t num_worker_threads, @@ -42,7 +38,7 @@ class DataLoader { ~DataLoader() = default; void load_reads(const std::string& path, bool recursive_file_loading = false, - ReadOrder traversal_order = UNRESTRICTED); + ReadOrder traversal_order = ReadOrder::UNRESTRICTED); static std::unordered_map load_read_groups( std::string data_path, diff --git a/dorado/read_pipeline/PairingNode.cpp b/dorado/read_pipeline/PairingNode.cpp index f95d3d0f5..e609d23f8 100644 --- a/dorado/read_pipeline/PairingNode.cpp +++ b/dorado/read_pipeline/PairingNode.cpp @@ -201,10 +201,16 @@ PairingNode::PairingNode(MessageSink& sink, m_num_worker_threads(num_worker_threads), m_max_num_keys(std::numeric_limits::max()), m_max_num_reads(std::numeric_limits::max()) { - if (read_order == ReadOrder::pore_order) { + switch (read_order) { + case ReadOrder::BY_CHANNEL: m_max_num_keys = 10; - } else if (read_order == ReadOrder::time_order) { + break; + case ReadOrder::BY_TIME: m_max_num_reads = 10; + break; + default: + throw std::runtime_error("Unsupported read order detected: " + + dorado::to_string(read_order)); } for (size_t i = 0; i < m_num_worker_threads; i++) { diff --git a/dorado/read_pipeline/PairingNode.h b/dorado/read_pipeline/PairingNode.h index 2a6d019c6..b0fc8dae1 100644 --- a/dorado/read_pipeline/PairingNode.h +++ b/dorado/read_pipeline/PairingNode.h @@ -2,6 +2,7 @@ #include "ReadPipeline.h" #include "utils/stats.h" +#include "utils/types.h" #include #include @@ -16,8 +17,6 @@ namespace dorado { class PairingNode : public MessageSink { public: - enum class ReadOrder { pore_order = 0, time_order }; - // Template-complement map: uses the pair_list pairing method PairingNode(MessageSink& sink, std::map template_complement_map, @@ -26,7 +25,7 @@ class PairingNode : public MessageSink { // No template-complement map: uses the pair_generation pairing method PairingNode(MessageSink& sink, - ReadOrder read_order = ReadOrder::pore_order, + ReadOrder read_order, int num_worker_threads = 2, size_t max_reads = 1000); ~PairingNode(); diff --git a/dorado/utils/types.h b/dorado/utils/types.h index 46cf34c77..83156c826 100644 --- a/dorado/utils/types.h +++ b/dorado/utils/types.h @@ -21,4 +21,19 @@ struct BamDestructor { }; using BamPtr = std::unique_ptr; +enum class ReadOrder { UNRESTRICTED, BY_CHANNEL, BY_TIME }; + +inline std::string to_string(ReadOrder read_order) { + switch (read_order) { + case ReadOrder::UNRESTRICTED: + return "UNRESTRICTED"; + case ReadOrder::BY_CHANNEL: + return "BY_CHANNEL"; + case ReadOrder::BY_TIME: + return "BY_TIME"; + default: + return "Unknown"; + } +} + } // namespace dorado diff --git a/tests/DuplexSplitTest.cpp b/tests/DuplexSplitTest.cpp index e26d4183c..32562738e 100644 --- a/tests/DuplexSplitTest.cpp +++ b/tests/DuplexSplitTest.cpp @@ -90,7 +90,7 @@ TEST_CASE("4 subread split tagging", TEST_GROUP) { MessageSinkToVector> sink(3); dorado::SubreadTaggerNode tag_node(sink); dorado::StereoDuplexEncoderNode stereo_node(tag_node, read->model_stride); - dorado::PairingNode pairing_node(stereo_node); + dorado::PairingNode pairing_node(stereo_node, dorado::ReadOrder::BY_CHANNEL); dorado::DuplexSplitSettings splitter_settings; dorado::DuplexSplitNode splitter_node(pairing_node, splitter_settings, 1); diff --git a/tests/PairingNodeTest.cpp b/tests/PairingNodeTest.cpp index 6f87b795a..3a942176d 100644 --- a/tests/PairingNodeTest.cpp +++ b/tests/PairingNodeTest.cpp @@ -45,7 +45,7 @@ TEST_CASE("Split read pairing", TEST_GROUP) { MessageSinkToVector sink(5); // one thread, one read - force reads through in order - dorado::PairingNode pairing_node(sink, dorado::PairingNode::ReadOrder::pore_order, 1, 1); + dorado::PairingNode pairing_node(sink, dorado::ReadOrder::BY_CHANNEL, 1, 1); for (auto& read : reads) { pairing_node.push_message(std::move(read)); } diff --git a/tests/Pod5DataLoaderTest.cpp b/tests/Pod5DataLoaderTest.cpp index 4d98a0260..9ebf9d558 100644 --- a/tests/Pod5DataLoaderTest.cpp +++ b/tests/Pod5DataLoaderTest.cpp @@ -124,7 +124,7 @@ TEST_CASE(TEST_GROUP "Load data sorted by channel id.") { MessageSinkToVector> sink(100); dorado::DataLoader loader(sink, "cpu", 1, 0); - loader.load_reads(data_path, true, dorado::DataLoader::ReadOrder::BY_CHANNEL); + loader.load_reads(data_path, true, dorado::ReadOrder::BY_CHANNEL); auto reads = sink.get_messages(); int start_channel_id = -1;