Skip to content

Commit

Permalink
queue total data size limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Artem Tarasov committed Oct 3, 2016
1 parent e1070e1 commit dd40435
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions sambamba/pileup.d
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
This file is part of Sambamba.
Copyright (C) 2012-2015 Artem Tarasov <lomereiter@gmail.com>
Copyright (C) 2012-2016 Artem Tarasov <lomereiter@gmail.com>
Sambamba is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -357,6 +357,8 @@ class ChunkDispatcher(ChunkRange) {
private FileFormat format_;
private std.stdio.File output_file_;
private size_t max_queue_length_;
private size_t max_queue_data_size_;
private size_t curr_queue_data_size_;

private Mutex mutex_, queue_mutex_;
private Condition queue_not_empty_condition_, queue_not_full_condition_;
Expand All @@ -368,7 +370,8 @@ class ChunkDispatcher(ChunkRange) {
alias ElementType!(Unqual!(ChunkRange)) Chunk;

this(string tmp_dir, ChunkRange chunks, MultiBamReader bam,
FileFormat format, std.stdio.File output_file, size_t max_queue_length) {
FileFormat format, std.stdio.File output_file, size_t max_queue_length,
size_t max_queue_data_size) {
tmp_dir_ = tmp_dir;
chunks_ = chunks;
bam_ = bam;
Expand All @@ -382,6 +385,7 @@ class ChunkDispatcher(ChunkRange) {
format_ = format;
output_file_ = output_file;
max_queue_length_ = max_queue_length;
max_queue_data_size_ = max_queue_data_size;
}

Nullable!(Tuple!(Chunk, string, size_t)) nextChunk() {
Expand Down Expand Up @@ -435,11 +439,24 @@ class ChunkDispatcher(ChunkRange) {

void queueResult(size_t num, char[] data) {
synchronized(queue_mutex_) {
while(result_queue_.length >= max_queue_length_ && num != curr_num_) {
stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too slow: reduce threads or improve output speed)");
queue_not_full_condition_.wait();
while (true) { // wait until it's good time to put the result into the output queue
if (num == curr_num_) // always put if it should be written out now
break;
if (result_queue_.empty) // always put if the queue is empty
break;

if (result_queue_.length >= max_queue_length_) {
stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too slow: reduce threads or improve output speed)");
queue_not_full_condition_.wait();
} else if (curr_queue_data_size_ + data.length > max_queue_data_size_) {
stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too large: increase buffer size)");
queue_not_full_condition_.wait();
} else {
break;
}
}
result_queue_.insert(Result(num, data));
curr_queue_data_size_ += data.length;
queue_not_empty_condition_.notify();
}
stderr.writeln("[chunk queued for dumping] ", num);
Expand All @@ -454,6 +471,7 @@ class ChunkDispatcher(ChunkRange) {
}
result = result_queue_.front;
result_queue_.popFront();
curr_queue_data_size_ -= result.data.length;
++curr_num_;
queue_not_full_condition_.notifyAll();
}
Expand Down Expand Up @@ -538,8 +556,8 @@ void worker(Dispatcher)(Dispatcher d,

auto chunkDispatcher(ChunkRange)(string tmp_dir, ChunkRange chunks,
MultiBamReader bam, FileFormat format,
std.stdio.File output_file, size_t max_queue_length) {
return new ChunkDispatcher!ChunkRange(tmp_dir, chunks, bam, format, output_file, max_queue_length);
std.stdio.File output_file, size_t max_queue_length, size_t max_queue_data_size) {
return new ChunkDispatcher!ChunkRange(tmp_dir, chunks, bam, format, output_file, max_queue_length, max_queue_data_size);
}

void printUsage() {
Expand Down Expand Up @@ -578,6 +596,8 @@ void printUsage() {
stderr.writeln(" maximum number of threads to use");
stderr.writeln(" -b, --buffer-size=64_000_000");
stderr.writeln(" chunk size (in bytes)");
stderr.writeln(" -B, --output-buffer-size=512_000_000");
stderr.writeln(" output buffer size (in bytes)");
}

version(standalone) {
Expand Down Expand Up @@ -613,6 +633,7 @@ int pileup_main(string[] args) {
uint n_threads = defaultPoolThreads;
std.stdio.File output_file = stdout;
size_t buffer_size = 64_000_000;
size_t output_buffer_size = 512_000_000;

string tmp_dir_prefix = defaultTmpDir();

Expand All @@ -624,7 +645,8 @@ int pileup_main(string[] args) {
"output-filename|o", &output_filename,
"tmpdir", &tmp_dir_prefix,
"nthreads|t", &n_threads,
"buffer-size|b", &buffer_size);
"buffer-size|b", &buffer_size,
"output-buffer-size|B", &output_buffer_size);

if (own_args.length < 2) {
printUsage();
Expand Down Expand Up @@ -658,7 +680,8 @@ int pileup_main(string[] args) {
}

auto chunks = reads.pileupChunks(false, buffer_size);
auto dispatcher = chunkDispatcher(tmp_dir, chunks, bam, bundled_args.input_format, output_file, 2 * n_threads);
auto dispatcher = chunkDispatcher(tmp_dir, chunks, bam, bundled_args.input_format, output_file,
2 * n_threads, output_buffer_size);

auto writer = new Thread(&dispatcher.dumpResults);
writer.start();
Expand Down

0 comments on commit dd40435

Please sign in to comment.