Skip to content

Commit

Permalink
Fix some bugs in splitting handling
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-steinegger committed Jan 16, 2020
1 parent d9a8874 commit 01db79d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 19 deletions.
5 changes: 2 additions & 3 deletions src/linclust/kmerindexdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,12 @@ int kmerindexdb(int argc, const char **argv, const Command &command) {
size_t totalKmers = computeKmerCount(seqDbr, KMER_SIZE, chooseTopKmer);
totalKmers *= par.pickNbest;
size_t totalSizeNeeded = computeMemoryNeededLinearfilter<short>(totalKmers);
Debug(Debug::INFO) << "Estimated memory consumption " << totalSizeNeeded/1024/1024 << " MB\n";
// compute splits
size_t splits = static_cast<size_t>(std::ceil(static_cast<float>(totalSizeNeeded) / memoryLimit));
size_t totalKmersPerSplit = static_cast<size_t>(std::min(totalSizeNeeded,memoryLimit)/sizeof(KmerPosition<short>));
std::vector<std::pair<size_t, size_t>> hashRanges = setupKmerSplits<short>(par, subMat, seqDbr, totalKmersPerSplit, splits);

Debug(Debug::INFO) << "Process file into " << splits << " parts\n";
Debug(Debug::INFO) << "Process file into " << hashRanges.size() << " parts\n";
std::vector<std::string> splitFiles;
KmerPosition<short> *hashSeqPair = NULL;

Expand All @@ -103,7 +102,7 @@ int kmerindexdb(int argc, const char **argv, const Command &command) {
memset(splitCntPerProc, 0, sizeof(unsigned int) * MMseqsMPI::numProc);
for(size_t i = 0; i < splits; i++){
splitCntPerProc[i % MMseqsMPI::numProc] += 1;
}
}Estimated memory consumption
for(int i = 0; i < MMseqsMPI::rank; i++){
fromSplit += splitCntPerProc[i];
}
Expand Down
36 changes: 22 additions & 14 deletions src/linclust/kmermatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include <algorithm>
#include <sys/mman.h>
#include <fcntl.h>
#include <sys/stat.h>


#ifdef OPENMP
#include <omp.h>
Expand Down Expand Up @@ -236,13 +238,13 @@ std::pair<size_t, size_t> fillKmerPositionArray(KmerPosition<T> * kmerArray, siz
int tooMuchElemInLastBin = (kmerInBins - kmerConsidered);

// add k-mer to represent the identity
if (hashStartRange <= static_cast<unsigned short>(seqHash) && static_cast<unsigned short>(seqHash) <= hashEndRange) {
if (static_cast<unsigned short>(seqHash) >= hashStartRange && static_cast<unsigned short>(seqHash) <= hashEndRange) {
threadKmerBuffer[bufferPos].kmer = seqHash;
threadKmerBuffer[bufferPos].id = seqId;
threadKmerBuffer[bufferPos].pos = 0;
threadKmerBuffer[bufferPos].seqLen = seq.L;
if(hashDistribution != NULL){
hashDistribution[static_cast<unsigned short>(seqHash)]++;
__sync_fetch_and_add(&hashDistribution[static_cast<unsigned short>(seqHash)], 1);
}
bufferPos++;
if (bufferPos >= BUFFER_SIZE) {
Expand Down Expand Up @@ -297,7 +299,7 @@ std::pair<size_t, size_t> fillKmerPositionArray(KmerPosition<T> * kmerArray, siz
// std::cout << seqId << "\t" << (kmers + kmerIdx)->score << "\t" << (kmers + kmerIdx)->pos << std::endl;

selectedKmer++;
if (hashStartRange <= (kmers + kmerIdx)->score && (kmers + kmerIdx)->score < hashEndRange)
if ((kmers + kmerIdx)->score >= hashStartRange && (kmers + kmerIdx)->score <= hashEndRange)
{
// std::cout << seqId << "\t" << (kmers + kmerIdx)->score << "\t" << (kmers + kmerIdx)->pos << std::endl;
threadKmerBuffer[bufferPos].kmer = (kmers + kmerIdx)->kmer;
Expand All @@ -306,7 +308,7 @@ std::pair<size_t, size_t> fillKmerPositionArray(KmerPosition<T> * kmerArray, siz
threadKmerBuffer[bufferPos].seqLen = seq.L;
bufferPos++;
if(hashDistribution != NULL){
hashDistribution[(kmers + kmerIdx)->score]++;
__sync_fetch_and_add(&hashDistribution[(kmers + kmerIdx)->score], 1);
}

if (bufferPos >= BUFFER_SIZE) {
Expand Down Expand Up @@ -384,7 +386,7 @@ KmerPosition<T> * doComputation(size_t totalKmers, size_t hashStartRange, size_t
std::pair<size_t, size_t > ret = fillKmerPositionArray<Parameters::DBTYPE_AMINO_ACIDS, T>(hashSeqPair, totalKmers, seqDbr, par, subMat, true, hashStartRange, hashEndRange, NULL);
elementsToSort = ret.first;
}
if(hashEndRange == 1){
if(hashEndRange == SIZE_T_MAX){
seqDbr.unmapData();
}

Expand Down Expand Up @@ -596,21 +598,20 @@ int kmermatcherInner(Parameters& par, DBReader<unsigned int>& seqDbr) {
// memoryLimit in bytes
size_t memoryLimit;
if (par.splitMemoryLimit > 0) {
memoryLimit = static_cast<size_t>(par.splitMemoryLimit);
memoryLimit = par.splitMemoryLimit;
} else {
memoryLimit = static_cast<size_t>(Util::getTotalSystemMemory() * 0.9);
}
Debug(Debug::INFO) << "\n";
size_t totalKmers = computeKmerCount(seqDbr, par.kmerSize, par.kmersPerSequence, par.kmersPerSequenceScale);
size_t totalSizeNeeded = computeMemoryNeededLinearfilter<T>(totalKmers);
Debug(Debug::INFO) << "Estimated memory consumption " << totalSizeNeeded/1024/1024 << " MB\n";
// compute splits
size_t splits = static_cast<size_t>(std::ceil(static_cast<float>(totalSizeNeeded) / memoryLimit));
size_t totalKmersPerSplit = static_cast<size_t>(std::min(totalSizeNeeded,memoryLimit)/sizeof(KmerPosition<short>));

std::vector<std::pair<size_t, size_t>> hashRanges = setupKmerSplits<T>(par, subMat, seqDbr, totalKmersPerSplit, splits);
if(splits > 1){
Debug(Debug::INFO) << "Process file into " << splits << " parts\n";
Debug(Debug::INFO) << "Process file into " << hashRanges.size() << " parts\n";
}
std::vector<std::string> splitFiles;
KmerPosition<T> *hashSeqPair = NULL;
Expand Down Expand Up @@ -744,7 +745,7 @@ std::vector<std::pair<size_t, size_t>> setupKmerSplits(Parameters &par, BaseMatr
}
}
if(maxBucketSize > totalKmers){
Debug(Debug::INFO) << "Not enough memory run kmermatcher. Minimum is at least " << maxBucketSize* sizeof(KmerPosition<T>) << " bytes\n";
Debug(Debug::INFO) << "Not enough memory to run the kmermatcher. Minimum is at least " << maxBucketSize* sizeof(KmerPosition<T>) << " bytes\n";
EXIT(EXIT_FAILURE);
}
// define splits
Expand Down Expand Up @@ -948,12 +949,19 @@ void mergeKmerFilesAndOutput(DBWriter & dbw,
for(size_t file = 0; file < tmpFiles.size(); file++){
files[file] = FileUtil::openFileOrDie(tmpFiles[file].c_str(),"r",true);
size_t dataSize;
entries[file] = (T*)FileUtil::mmapFile(files[file], &dataSize);
struct stat sb;
fstat(fileno(files[file]) , &sb);
if(sb.st_size > 0){
entries[file] = (T*)FileUtil::mmapFile(files[file], &dataSize);
#if HAVE_POSIX_MADVISE
if (posix_madvise (entries[file], dataSize, POSIX_MADV_SEQUENTIAL) != 0){
Debug(Debug::ERROR) << "posix_madvise returned an error for file " << tmpFiles[file] << "\n";
}
if (posix_madvise (entries[file], dataSize, POSIX_MADV_SEQUENTIAL) != 0){
Debug(Debug::ERROR) << "posix_madvise returned an error for file " << tmpFiles[file] << "\n";
}
#endif
}else{
dataSize = 0;
}

dataSizes[file] = dataSize;
entrySizes[file] = dataSize/sizeof(T);
}
Expand Down Expand Up @@ -1068,7 +1076,7 @@ void mergeKmerFilesAndOutput(DBWriter & dbw,
}
for(size_t file = 0; file < tmpFiles.size(); file++) {
fclose(files[file]);
if(munmap((void*)entries[file], dataSizes[file]) < 0){
if(dataSizes[file] > 0 && munmap((void*)entries[file], dataSizes[file]) < 0){
Debug(Debug::ERROR) << "Failed to munmap memory dataSize=" << dataSizes[file] <<"\n";
EXIT(EXIT_FAILURE);
}
Expand Down
3 changes: 1 addition & 2 deletions src/linclust/kmersearch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ int kmersearch(int argc, const char **argv, const Command &command) {

size_t totalKmers = computeKmerCount(queryDbr, KMER_SIZE, chooseTopKmer);
size_t totalSizeNeeded = computeMemoryNeededLinearfilter<short>(totalKmers);
Debug(Debug::INFO) << "Estimated memory consumption " << totalSizeNeeded/1024/1024 << " MB\n";

BaseMatrix *subMat;
if (Parameters::isEqualDbtype(querySeqType, Parameters::DBTYPE_NUCLEOTIDES)) {
Expand All @@ -222,7 +221,7 @@ int kmersearch(int argc, const char **argv, const Command &command) {
std::vector<std::pair<size_t, size_t>> hashRanges = setupKmerSplits<short>(par, subMat, queryDbr, totalKmersPerSplit, splits);

int outDbType = (Parameters::isEqualDbtype(queryDbr.getDbtype(), Parameters::DBTYPE_NUCLEOTIDES)) ? Parameters::DBTYPE_PREFILTER_REV_RES : Parameters::DBTYPE_PREFILTER_RES;
Debug(Debug::INFO) << "Process file into " << splits << " parts\n";
Debug(Debug::INFO) << "Process file into " << hashRanges.size() << " parts\n";

std::vector<std::string> splitFiles;
for (size_t split = 0; split < hashRanges.size(); split++) {
Expand Down

0 comments on commit 01db79d

Please sign in to comment.