Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue total data size limit #253

Merged
merged 1 commit into from
Oct 24, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions sambamba/pileup.d
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
This file is part of Sambamba.
Copyright (C) 2012-2015 Artem Tarasov <lomereiter@gmail.com>
Copyright (C) 2012-2016 Artem Tarasov <lomereiter@gmail.com>

Sambamba is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -359,6 +359,8 @@ class ChunkDispatcher(ChunkRange) {
private size_t max_queue_length_;
private int prev_ref_id;
private ulong prev_pos_diff;
private size_t max_queue_data_size_;
private size_t curr_queue_data_size_;

private Mutex mutex_, queue_mutex_;
private Condition queue_not_empty_condition_, queue_not_full_condition_;
Expand All @@ -370,7 +372,8 @@ class ChunkDispatcher(ChunkRange) {
alias ElementType!(Unqual!(ChunkRange)) Chunk;

this(string tmp_dir, ChunkRange chunks, MultiBamReader bam,
FileFormat format, std.stdio.File output_file, size_t max_queue_length) {
FileFormat format, std.stdio.File output_file, size_t max_queue_length,
size_t max_queue_data_size) {
tmp_dir_ = tmp_dir;
chunks_ = chunks;
bam_ = bam;
Expand All @@ -384,6 +387,7 @@ class ChunkDispatcher(ChunkRange) {
format_ = format;
output_file_ = output_file;
max_queue_length_ = max_queue_length;
max_queue_data_size_ = max_queue_data_size;
}

Nullable!(Tuple!(Chunk, string, size_t)) nextChunk() {
Expand Down Expand Up @@ -447,11 +451,24 @@ class ChunkDispatcher(ChunkRange) {

void queueResult(size_t num, char[] data) {
synchronized(queue_mutex_) {
while(result_queue_.length >= max_queue_length_ && num != curr_num_) {
stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too slow: reduce threads or improve output speed)");
queue_not_full_condition_.wait();
while (true) { // wait until it's good time to put the result into the output queue
if (num == curr_num_) // always put if it should be written out now
break;
if (result_queue_.empty) // always put if the queue is empty
break;

if (result_queue_.length >= max_queue_length_) {
stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too slow: reduce threads or improve output speed)");
queue_not_full_condition_.wait();
} else if (curr_queue_data_size_ + data.length > max_queue_data_size_) {
stderr.writeln("[chunk waiting for dump queue] ", num, " (output is too large: increase buffer size)");
queue_not_full_condition_.wait();
} else {
break;
}
}
result_queue_.insert(Result(num, data));
curr_queue_data_size_ += data.length;
queue_not_empty_condition_.notify();
}
stderr.writeln("[chunk queued for dumping] ", num);
Expand All @@ -466,6 +483,7 @@ class ChunkDispatcher(ChunkRange) {
}
result = result_queue_.front;
result_queue_.popFront();
curr_queue_data_size_ -= result.data.length;
++curr_num_;
queue_not_full_condition_.notifyAll();
}
Expand Down Expand Up @@ -550,8 +568,8 @@ void worker(Dispatcher)(Dispatcher d,

auto chunkDispatcher(ChunkRange)(string tmp_dir, ChunkRange chunks,
MultiBamReader bam, FileFormat format,
std.stdio.File output_file, size_t max_queue_length) {
return new ChunkDispatcher!ChunkRange(tmp_dir, chunks, bam, format, output_file, max_queue_length);
std.stdio.File output_file, size_t max_queue_length, size_t max_queue_data_size) {
return new ChunkDispatcher!ChunkRange(tmp_dir, chunks, bam, format, output_file, max_queue_length, max_queue_data_size);
}

void printUsage() {
Expand Down Expand Up @@ -590,6 +608,8 @@ void printUsage() {
stderr.writeln(" maximum number of threads to use");
stderr.writeln(" -b, --buffer-size=64_000_000");
stderr.writeln(" chunk size (in bytes)");
stderr.writeln(" -B, --output-buffer-size=512_000_000");
stderr.writeln(" output buffer size (in bytes)");
}

version(standalone) {
Expand Down Expand Up @@ -625,6 +645,7 @@ int pileup_main(string[] args) {
uint n_threads = defaultPoolThreads;
std.stdio.File output_file = stdout;
size_t buffer_size = 64_000_000;
size_t output_buffer_size = 512_000_000;

string tmp_dir_prefix = defaultTmpDir();

Expand All @@ -636,7 +657,8 @@ int pileup_main(string[] args) {
"output-filename|o", &output_filename,
"tmpdir", &tmp_dir_prefix,
"nthreads|t", &n_threads,
"buffer-size|b", &buffer_size);
"buffer-size|b", &buffer_size,
"output-buffer-size|B", &output_buffer_size);

if (own_args.length < 2) {
printUsage();
Expand Down Expand Up @@ -670,7 +692,8 @@ int pileup_main(string[] args) {
}

auto chunks = reads.pileupChunks(false, buffer_size);
auto dispatcher = chunkDispatcher(tmp_dir, chunks, bam, bundled_args.input_format, output_file, 2 * n_threads);
auto dispatcher = chunkDispatcher(tmp_dir, chunks, bam, bundled_args.input_format, output_file,
2 * n_threads, output_buffer_size);

auto writer = new Thread(&dispatcher.dumpResults);
writer.start();
Expand Down