diff --git a/dorado/cli/aligner.cpp b/dorado/cli/aligner.cpp index ee6739e45..9bda41ff7 100644 --- a/dorado/cli/aligner.cpp +++ b/dorado/cli/aligner.cpp @@ -107,11 +107,10 @@ int aligner(int argc, char* argv[]) { add_pg_hdr(header); PipelineDescriptor pipeline_desc; - auto aligner = pipeline_desc.add_node({}, index, kmer_size, window_size, - index_batch_size, aligner_threads); auto hts_writer = pipeline_desc.add_node({}, "-", HtsWriter::OutputMode::BAM, writer_threads, 0); - pipeline_desc.add_node_sink(aligner, hts_writer); + auto aligner = pipeline_desc.add_node({hts_writer}, index, kmer_size, window_size, + index_batch_size, aligner_threads); // Create the Pipeline from our description. std::vector stats_reporters; @@ -146,7 +145,8 @@ int aligner(int argc, char* argv[]) { tracker.summarize(); spdlog::info("> finished alignment"); - + spdlog::info("> total/primary/unmapped {}/{}/{}", hts_writer_ref.get_total(), + hts_writer_ref.get_primary(), hts_writer_ref.get_unmapped()); return 0; } diff --git a/dorado/cli/basecaller.cpp b/dorado/cli/basecaller.cpp index 92f7f88d4..74eb1f7ad 100644 --- a/dorado/cli/basecaller.cpp +++ b/dorado/cli/basecaller.cpp @@ -148,11 +148,10 @@ void setup(std::vector args, {read_converter}, min_qscore, default_parameters.min_sequence_length, std::unordered_set{}, thread_allocations.read_filter_threads); - auto mod_base_caller_node = PipelineDescriptor::InvalidNodeHandle; auto basecaller_node_sink = read_filter_node; if (!remora_runners.empty()) { - mod_base_caller_node = pipeline_desc.add_node( + auto mod_base_caller_node = pipeline_desc.add_node( {read_filter_node}, std::move(remora_runners), thread_allocations.remora_threads * num_devices, model_stride, remora_batch_size); basecaller_node_sink = mod_base_caller_node; diff --git a/dorado/read_pipeline/HtsWriter.cpp b/dorado/read_pipeline/HtsWriter.cpp index 56f1a8b2f..24d8eb592 100644 --- a/dorado/read_pipeline/HtsWriter.cpp +++ b/dorado/read_pipeline/HtsWriter.cpp @@ -58,7 +58,6 @@ HtsWriter::~HtsWriter() { terminate_impl(); sam_hdr_destroy(m_header); hts_close(m_file); - spdlog::info("> total/primary/unmapped {}/{}/{}", m_total, m_primary, m_unmapped); } HtsWriter::OutputMode HtsWriter::get_output_mode(const std::string& mode) { diff --git a/dorado/read_pipeline/HtsWriter.h b/dorado/read_pipeline/HtsWriter.h index f0764ee4a..47c4cad27 100644 --- a/dorado/read_pipeline/HtsWriter.h +++ b/dorado/read_pipeline/HtsWriter.h @@ -28,6 +28,9 @@ class HtsWriter : public MessageSink { int set_and_write_header(const sam_hdr_t* header); static OutputMode get_output_mode(const std::string& mode); + size_t get_total() const { return m_total; } + size_t get_primary() const { return m_primary; } + size_t get_unmapped() const { return m_unmapped; } private: void terminate_impl(); diff --git a/dorado/read_pipeline/ReadPipeline.cpp b/dorado/read_pipeline/ReadPipeline.cpp index 199947a68..d736cd388 100644 --- a/dorado/read_pipeline/ReadPipeline.cpp +++ b/dorado/read_pipeline/ReadPipeline.cpp @@ -374,7 +374,8 @@ void MessageSink::send_message_to_sink(int sink_index, Message &&message) { void Pipeline::push_message(Message &&message) { assert(!m_nodes.empty()); - dynamic_cast(*m_nodes.back()).push_message(std::move(message)); + const auto source_node_index = m_source_to_sink_order.front(); + dynamic_cast(*m_nodes.at(source_node_index)).push_message(std::move(message)); } stats::NamedStats Pipeline::terminate() { diff --git a/tests/PipelineTest.cpp b/tests/PipelineTest.cpp index d9d69d21a..83833b12f 100644 --- a/tests/PipelineTest.cpp +++ b/tests/PipelineTest.cpp @@ -1,3 +1,4 @@ +#include "MessageSinkUtils.h" #include "read_pipeline/NullNode.h" #include "read_pipeline/ReadPipeline.h" @@ -89,7 +90,7 @@ TEST_CASE("Creation", TEST_GROUP) { } // Tests destruction order of a random linear pipeline. -TEST_CASE("LinearDestructionOrder") { +TEST_CASE("LinearDestructionOrder", TEST_GROUP) { // Node that records destruction order. class OrderTestNode : public MessageSink { public: @@ -136,3 +137,35 @@ TEST_CASE("LinearDestructionOrder") { // Verify that nodes were destroyed in source-to-sink order. CHECK(std::equal(destruction_order.cbegin(), destruction_order.cend(), indices.cbegin())); } + +// Test inputs flow in the expected way from the source node. +TEST_CASE("PipelineFlow", TEST_GROUP) { + // NullNode passes nothing on, so the sink should get no messages + // if they are sent to that node first. + { + // Natural construction order: sink to source. + PipelineDescriptor pipeline_desc; + std::vector messages; + auto sink = pipeline_desc.add_node({}, 100, messages); + pipeline_desc.add_node({sink}); + auto pipeline = dorado::Pipeline::create(std::move(pipeline_desc)); + REQUIRE(pipeline != nullptr); + pipeline->push_message(std::shared_ptr()); + pipeline.reset(); + CHECK(messages.size() == 0); + } + + { + // Peverse construction order: source to sink. + PipelineDescriptor pipeline_desc; + std::vector messages; + auto null_node = pipeline_desc.add_node({}); + auto sink = pipeline_desc.add_node({}, 100, messages); + pipeline_desc.add_node_sink(null_node, sink); + auto pipeline = dorado::Pipeline::create(std::move(pipeline_desc)); + REQUIRE(pipeline != nullptr); + pipeline->push_message(std::shared_ptr()); + pipeline.reset(); + CHECK(messages.size() == 0); + } +} \ No newline at end of file