From a4710bbf3a6e120b34ba73ec50bc3c25212a4033 Mon Sep 17 00:00:00 2001 From: Artem Tarasov Date: Mon, 3 Oct 2016 12:56:21 +0200 Subject: [PATCH] queue total data size limit --- sambamba/pileup.d | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/sambamba/pileup.d b/sambamba/pileup.d index 0a6ccec8..aed9a409 100644 --- a/sambamba/pileup.d +++ b/sambamba/pileup.d @@ -1,6 +1,6 @@ /* This file is part of Sambamba. - Copyright (C) 2012-2015 Artem Tarasov + Copyright (C) 2012-2016 Artem Tarasov Sambamba is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -359,6 +359,8 @@ class ChunkDispatcher(ChunkRange) { private size_t max_queue_length_; private int prev_ref_id; private ulong prev_pos_diff; + private size_t max_queue_data_size_; + private size_t curr_queue_data_size_; private Mutex mutex_, queue_mutex_; private Condition queue_not_empty_condition_, queue_not_full_condition_; @@ -370,7 +372,8 @@ class ChunkDispatcher(ChunkRange) { alias ElementType!(Unqual!(ChunkRange)) Chunk; this(string tmp_dir, ChunkRange chunks, MultiBamReader bam, - FileFormat format, std.stdio.File output_file, size_t max_queue_length) { + FileFormat format, std.stdio.File output_file, size_t max_queue_length, + size_t max_queue_data_size) { tmp_dir_ = tmp_dir; chunks_ = chunks; bam_ = bam; @@ -384,6 +387,7 @@ class ChunkDispatcher(ChunkRange) { format_ = format; output_file_ = output_file; max_queue_length_ = max_queue_length; + max_queue_data_size_ = max_queue_data_size; } Nullable!(Tuple!(Chunk, string, size_t)) nextChunk() { @@ -447,11 +451,24 @@ class ChunkDispatcher(ChunkRange) { void queueResult(size_t num, char[] data) { synchronized(queue_mutex_) { - while(result_queue_.length >= max_queue_length_ && num != curr_num_) { - stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too slow: reduce threads or improve output speed)"); - queue_not_full_condition_.wait(); + while (true) { // wait until it's good time to put the result into the output queue + if (num == curr_num_) // always put if it should be written out now + break; + if (result_queue_.empty) // always put if the queue is empty + break; + + if (result_queue_.length >= max_queue_length_) { + stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too slow: reduce threads or improve output speed)"); + queue_not_full_condition_.wait(); + } else if (curr_queue_data_size_ + data.length > max_queue_data_size_) { + stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too large: increase buffer size)"); + queue_not_full_condition_.wait(); + } else { + break; + } } result_queue_.insert(Result(num, data)); + curr_queue_data_size_ += data.length; queue_not_empty_condition_.notify(); } stderr.writeln("[chunk queued for dumping] ", num); @@ -466,6 +483,7 @@ class ChunkDispatcher(ChunkRange) { } result = result_queue_.front; result_queue_.popFront(); + curr_queue_data_size_ -= result.data.length; ++curr_num_; queue_not_full_condition_.notifyAll(); } @@ -550,8 +568,8 @@ void worker(Dispatcher)(Dispatcher d, auto chunkDispatcher(ChunkRange)(string tmp_dir, ChunkRange chunks, MultiBamReader bam, FileFormat format, - std.stdio.File output_file, size_t max_queue_length) { - return new ChunkDispatcher!ChunkRange(tmp_dir, chunks, bam, format, output_file, max_queue_length); + std.stdio.File output_file, size_t max_queue_length, size_t max_queue_data_size) { + return new ChunkDispatcher!ChunkRange(tmp_dir, chunks, bam, format, output_file, max_queue_length, max_queue_data_size); } void printUsage() { @@ -590,6 +608,8 @@ void printUsage() { stderr.writeln(" maximum number of threads to use"); stderr.writeln(" -b, --buffer-size=64_000_000"); stderr.writeln(" chunk size (in bytes)"); + stderr.writeln(" -B, --output-buffer-size=512_000_000"); + stderr.writeln(" output buffer size (in bytes)"); } version(standalone) { @@ -625,6 +645,7 @@ int pileup_main(string[] args) { uint n_threads = defaultPoolThreads; std.stdio.File output_file = stdout; size_t buffer_size = 64_000_000; + size_t output_buffer_size = 512_000_000; string tmp_dir_prefix = defaultTmpDir(); @@ -636,7 +657,8 @@ int pileup_main(string[] args) { "output-filename|o", &output_filename, "tmpdir", &tmp_dir_prefix, "nthreads|t", &n_threads, - "buffer-size|b", &buffer_size); + "buffer-size|b", &buffer_size, + "output-buffer-size|B", &output_buffer_size); if (own_args.length < 2) { printUsage(); @@ -670,7 +692,8 @@ int pileup_main(string[] args) { } auto chunks = reads.pileupChunks(false, buffer_size); - auto dispatcher = chunkDispatcher(tmp_dir, chunks, bam, bundled_args.input_format, output_file, 2 * n_threads); + auto dispatcher = chunkDispatcher(tmp_dir, chunks, bam, bundled_args.input_format, output_file, + 2 * n_threads, output_buffer_size); auto writer = new Thread(&dispatcher.dumpResults); writer.start();