diff --git a/sambamba/pileup.d b/sambamba/pileup.d index 9acc4c90..4b5d05d4 100644 --- a/sambamba/pileup.d +++ b/sambamba/pileup.d @@ -1,6 +1,6 @@ /* This file is part of Sambamba. - Copyright (C) 2012-2015 Artem Tarasov + Copyright (C) 2012-2016 Artem Tarasov Sambamba is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -357,6 +357,8 @@ class ChunkDispatcher(ChunkRange) { private FileFormat format_; private std.stdio.File output_file_; private size_t max_queue_length_; + private size_t max_queue_data_size_; + private size_t curr_queue_data_size_; private Mutex mutex_, queue_mutex_; private Condition queue_not_empty_condition_, queue_not_full_condition_; @@ -368,7 +370,8 @@ class ChunkDispatcher(ChunkRange) { alias ElementType!(Unqual!(ChunkRange)) Chunk; this(string tmp_dir, ChunkRange chunks, MultiBamReader bam, - FileFormat format, std.stdio.File output_file, size_t max_queue_length) { + FileFormat format, std.stdio.File output_file, size_t max_queue_length, + size_t max_queue_data_size) { tmp_dir_ = tmp_dir; chunks_ = chunks; bam_ = bam; @@ -382,6 +385,7 @@ class ChunkDispatcher(ChunkRange) { format_ = format; output_file_ = output_file; max_queue_length_ = max_queue_length; + max_queue_data_size_ = max_queue_data_size; } Nullable!(Tuple!(Chunk, string, size_t)) nextChunk() { @@ -435,11 +439,24 @@ class ChunkDispatcher(ChunkRange) { void queueResult(size_t num, char[] data) { synchronized(queue_mutex_) { - while(result_queue_.length >= max_queue_length_ && num != curr_num_) { - stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too slow: reduce threads or improve output speed)"); - queue_not_full_condition_.wait(); + while (true) { // wait until it's good time to put the result into the output queue + if (num == curr_num_) // always put if it should be written out now + break; + if (result_queue_.empty) // always put if the queue is empty + break; + + if (result_queue_.length >= max_queue_length_) { + stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too slow: reduce threads or improve output speed)"); + queue_not_full_condition_.wait(); + } else if (curr_queue_data_size_ + data.length > max_queue_data_size_) { + stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too large: increase buffer size)"); + queue_not_full_condition_.wait(); + } else { + break; + } } result_queue_.insert(Result(num, data)); + curr_queue_data_size_ += data.length; queue_not_empty_condition_.notify(); } stderr.writeln("[chunk queued for dumping] ", num); @@ -454,6 +471,7 @@ class ChunkDispatcher(ChunkRange) { } result = result_queue_.front; result_queue_.popFront(); + curr_queue_data_size_ -= result.data.length; ++curr_num_; queue_not_full_condition_.notifyAll(); } @@ -538,8 +556,8 @@ void worker(Dispatcher)(Dispatcher d, auto chunkDispatcher(ChunkRange)(string tmp_dir, ChunkRange chunks, MultiBamReader bam, FileFormat format, - std.stdio.File output_file, size_t max_queue_length) { - return new ChunkDispatcher!ChunkRange(tmp_dir, chunks, bam, format, output_file, max_queue_length); + std.stdio.File output_file, size_t max_queue_length, size_t max_queue_data_size) { + return new ChunkDispatcher!ChunkRange(tmp_dir, chunks, bam, format, output_file, max_queue_length, max_queue_data_size); } void printUsage() { @@ -578,6 +596,8 @@ void printUsage() { stderr.writeln(" maximum number of threads to use"); stderr.writeln(" -b, --buffer-size=64_000_000"); stderr.writeln(" chunk size (in bytes)"); + stderr.writeln(" -B, --output-buffer-size=512_000_000"); + stderr.writeln(" output buffer size (in bytes)"); } version(standalone) { @@ -613,6 +633,7 @@ int pileup_main(string[] args) { uint n_threads = defaultPoolThreads; std.stdio.File output_file = stdout; size_t buffer_size = 64_000_000; + size_t output_buffer_size = 512_000_000; string tmp_dir_prefix = defaultTmpDir(); @@ -624,7 +645,8 @@ int pileup_main(string[] args) { "output-filename|o", &output_filename, "tmpdir", &tmp_dir_prefix, "nthreads|t", &n_threads, - "buffer-size|b", &buffer_size); + "buffer-size|b", &buffer_size, + "output-buffer-size|B", &output_buffer_size); if (own_args.length < 2) { printUsage(); @@ -658,7 +680,8 @@ int pileup_main(string[] args) { } auto chunks = reads.pileupChunks(false, buffer_size); - auto dispatcher = chunkDispatcher(tmp_dir, chunks, bam, bundled_args.input_format, output_file, 2 * n_threads); + auto dispatcher = chunkDispatcher(tmp_dir, chunks, bam, bundled_args.input_format, output_file, + 2 * n_threads, output_buffer_size); auto writer = new Thread(&dispatcher.dumpResults); writer.start();