Skip to content

Commit

Permalink
Bug fixes for reads >= 255:
Browse files Browse the repository at this point in the history
1. use 16 bits offset (first_0_out & last_0_in) to capture candidate reads for mercy k-mer
2. batch size limited to 256M when reading binary reads (gzread max size 4G?)
  • Loading branch information
voutcn committed Mar 4, 2015
1 parent 021c346 commit 06f84d5
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 18 deletions.
27 changes: 16 additions & 11 deletions cx1_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,18 @@ void InitGlobalData(struct global_data_t &globals) {
globals.permutation_to_output = (uint32_t *) MallocAndCheck(globals.max_lv2_items * sizeof(uint32_t), __FILE__, __LINE__);
globals.lv2_read_info = (int64_t *) MallocAndCheck(globals.max_lv2_items * sizeof(int64_t), __FILE__, __LINE__);
globals.lv2_read_info_to_output = (int64_t *) MallocAndCheck(globals.max_lv2_items * sizeof(int64_t), __FILE__, __LINE__);
#ifdef LONG_READS
globals.first_0_out = (uint16_t*) MallocAndCheck(globals.num_reads * sizeof(uint16_t), __FILE__, __LINE__);
globals.last_0_in = (uint16_t*) MallocAndCheck(globals.num_reads * sizeof(uint16_t), __FILE__, __LINE__);
#else
globals.first_0_out = (unsigned char*) MallocAndCheck(globals.num_reads * sizeof(unsigned char), __FILE__, __LINE__);
globals.last_0_in = (unsigned char*) MallocAndCheck(globals.num_reads * sizeof(unsigned char), __FILE__, __LINE__);
#endif
#ifdef DISABLE_GPU
globals.cpu_sort_space = (uint64_t*) MallocAndCheck(sizeof(uint64_t) * globals.max_lv2_items, __FILE__, __LINE__);
#endif
memset(globals.first_0_out, 0xFF, globals.num_reads * sizeof(unsigned char));
memset(globals.last_0_in, 0xFF, globals.num_reads * sizeof(unsigned char));
memset(globals.first_0_out, 0xFF, globals.num_reads * sizeof(globals.first_0_out[0]));
memset(globals.last_0_in, 0xFF, globals.num_reads * sizeof(globals.last_0_in[0]));

// --- write the edge file header ---
globals.word_writer[0].output(globals.kmer_k);
Expand Down Expand Up @@ -867,8 +872,8 @@ void* Lv2CountingThread(void *_op) {
if (strand == 0) {
// update last
while (true) {
unsigned char old_value = globals.last_0_in[read_id];
if (old_value != 255 && old_value >= offset) { break; }
auto old_value = globals.last_0_in[read_id];
if (old_value != kSentinelOffset && old_value >= offset) { break; }
if (__sync_bool_compare_and_swap(globals.last_0_in + read_id, old_value, offset)) {
break;
}
Expand All @@ -877,7 +882,7 @@ void* Lv2CountingThread(void *_op) {
// update first
offset++;
while (true) {
unsigned char old_value = globals.first_0_out[read_id];
auto old_value = globals.first_0_out[read_id];
if (old_value <= offset) { break; }
if (__sync_bool_compare_and_swap(globals.first_0_out + read_id, old_value, offset)) {
break;
Expand All @@ -898,7 +903,7 @@ void* Lv2CountingThread(void *_op) {
// update first
offset++;
while (true) {
unsigned char old_value = globals.first_0_out[read_id];
auto old_value = globals.first_0_out[read_id];
if (old_value <= offset) { break; }
if (__sync_bool_compare_and_swap(globals.first_0_out + read_id, old_value, offset)) {
break;
Expand All @@ -907,8 +912,8 @@ void* Lv2CountingThread(void *_op) {
} else {
// update last
while (true) {
unsigned char old_value = globals.last_0_in[read_id];
if (old_value != 255 && old_value >= offset) { break; }
auto old_value = globals.last_0_in[read_id];
if (old_value != kSentinelOffset && old_value >= offset) { break; }
if (__sync_bool_compare_and_swap(globals.last_0_in + read_id, old_value, offset)) {
break;
}
Expand Down Expand Up @@ -1128,9 +1133,9 @@ void Phase1Entry(struct global_data_t &globals) {
int64_t num_has_tips = 0;
FILE *candidate_file = OpenFileAndCheck((std::string(globals.output_prefix) + ".cand").c_str(), "wb");
for (int64_t i = 0; i < globals.num_reads; ++i) {
unsigned char first = globals.first_0_out[i];
unsigned char last = globals.last_0_in[i];
if (first != 255 && last != 255) {
auto first = globals.first_0_out[i];
auto last = globals.last_0_in[i];
if (first != kSentinelOffset && last != kSentinelOffset) {
++num_has_tips;
if (last > first) {
++num_candidate_reads;
Expand Down
16 changes: 10 additions & 6 deletions io-utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ struct ContigPackage {

struct ReadPackage {
const static int kMaxNumReads = (1 << 22); // tunable
const static int kMaxBytePerBatch = (1 << 28); // 256M, tunable
int reads_per_batch;

ReadPackage(int max_read_len) {
init(max_read_len);
Expand All @@ -106,7 +108,9 @@ struct ReadPackage {
}
words_per_read = (max_read_len * 2 + offset_bits + 31) / 32;
printf("words per read: %d\n", words_per_read);
packed_reads = (edge_word_t*) MallocAndCheck(sizeof(edge_word_t) * words_per_read * kMaxNumReads, __FILE__, __LINE__);
reads_per_batch = kMaxBytePerBatch / (words_per_read * 4);
reads_per_batch = reads_per_batch > kMaxNumReads ? kMaxNumReads : reads_per_batch;
packed_reads = (edge_word_t*) MallocAndCheck(sizeof(edge_word_t) * words_per_read * reads_per_batch, __FILE__, __LINE__);
assert(packed_reads != NULL);
}

Expand All @@ -121,7 +125,7 @@ struct ReadPackage {
edge_word_t *cur_p = packed_reads;
while (kseq_read(seq) >= 0) {
if (int(seq->seq.l) > max_read_len) {
fprintf(stderr, "Warning, a read longer the max_read_len: %d\n", max_read_len);
fprintf(stderr, "Warning, a read longer than max_read_len: %d > %d\n", (int)(seq->seq.l), max_read_len);
continue;
}

Expand Down Expand Up @@ -150,7 +154,7 @@ struct ReadPackage {

num_of_reads++;
cur_p += words_per_read;
if (num_of_reads >= kMaxNumReads) {
if (num_of_reads >= reads_per_batch) {
break;
}
}
Expand All @@ -161,7 +165,7 @@ struct ReadPackage {
edge_word_t *cur_p = packed_reads;
while (kseq_read(seq) >= 0) {
if ((int)(seq->seq.l) > max_read_len) {
fprintf(stderr, "Warning, a read longer the max_read_len: %d\n", max_read_len);
fprintf(stderr, "Warning, a read longer than max_read_len: %d > %d\n", (int)(seq->seq.l), max_read_len);
continue;
}

Expand Down Expand Up @@ -190,15 +194,15 @@ struct ReadPackage {

num_of_reads++;
cur_p += words_per_read;
if (num_of_reads >= kMaxNumReads) {
if (num_of_reads >= reads_per_batch) {
break;
}
}
}

void ReadBinaryReads(gzFile read_file) {
int num_bytes = gzread(read_file, packed_reads,
sizeof(edge_word_t) * words_per_read * kMaxNumReads);
sizeof(edge_word_t) * words_per_read * reads_per_batch);
assert(num_bytes % (words_per_read * sizeof(edge_word_t)) == 0);
num_of_reads = num_bytes / (words_per_read * sizeof(edge_word_t));
}
Expand Down
2 changes: 1 addition & 1 deletion megahit
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import multiprocessing

from datetime import datetime, date, time

megahit_version_str = "MEGAHIT v0.2.0"
megahit_version_str = "MEGAHIT v0.2.0-a"
usage_message = '''
Copyright (c) The University of Hong Kong
Expand Down
14 changes: 14 additions & 0 deletions sdbg_builder_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include "timer.h"
#include "sdbg_builder_writers.h"

// uncomment the following line if read lenght > 254 bp; comment it if read length <= 254 and want to save memory
#define LONG_READS

struct global_data_t;
struct readpartition_data_t;
struct bucketpartition_data_t;
Expand Down Expand Up @@ -124,8 +127,13 @@ struct global_data_t {

// big arrays
edge_word_t* packed_reads;
#ifndef LONG_READS
unsigned char *first_0_out;
unsigned char *last_0_in; // first potential 0-out-degree k-mer and last potential 0-in-degree k-mer
#else
uint16_t *first_0_out;
uint16_t *last_0_in;
#endif
int64_t *lv2_read_info; // to store where this lv2_item (k+1)-mer come from
int64_t *lv2_read_info_to_output;

Expand Down Expand Up @@ -191,6 +199,12 @@ static const int kTopCharShift = kBitsPerEdgeWord - kBitsPerEdgeChar; // bits >>
static const uint32_t kDifferentialLimit = 2147483647; // 32-bit signed max int
static const int kSignBitMask = 0x80000000; // the MSB of 32-bit

#ifndef LONG_READS
static const int kSentinelOffset = 255;
#else
static const int kSentinelOffset = 65535;
#endif

// can be modified, efficiency related
static const int64_t kMinLv2BatchSize = 2097152;
static const int kDefaultLv1ScanTime = 8;
Expand Down

0 comments on commit 06f84d5

Please sign in to comment.