Skip to content

Commit

Permalink
round robin for multi-file read sketching and sketch id/comment options
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Ondov committed Sep 21, 2018
1 parent ea245b3 commit 03b28ed
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 40 deletions.
29 changes: 28 additions & 1 deletion src/mash/CommandSketch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ CommandSketch::CommandSketch()
useOption("help");
addOption("list", Option(Option::Boolean, "l", "Input", "List input. Lines in each <input> specify paths to sequence files, one per line.", ""));
addOption("prefix", Option(Option::File, "o", "Output", "Output prefix (first input file used if unspecified). The suffix '.msh' will be appended.", ""));
addOption("id", Option(Option::File, "I", "Sketch", "ID field for sketch of reads (instead of first sequence ID).", ""));
addOption("comment", Option(Option::File, "C", "Sketch", "Comment for a sketch of reads (instead of first sequence comment.", ""));
useSketchOptions();
}

Expand Down Expand Up @@ -79,7 +81,32 @@ int CommandSketch::run() const
}
}

sketch.initFromFiles(files, parameters, verbosity);
if ( getOption("id").active || getOption("comment").active )
{
if ( files.size() > 1 && ! parameters.reads )
{
cerr << "WARNING: -I and -C will only apply to first sketch" << endl;
}
}

if ( parameters.reads )
{
sketch.initFromReads(files, parameters);
}
else
{
sketch.initFromFiles(files, parameters, verbosity);
}

if ( getOption("id").active )
{
sketch.setReferenceName(0, getOption("id").argument);
}

if ( getOption("comment").active )
{
sketch.setReferenceComment(0, getOption("comment").argument);
}

double lengthThreshold = (parameters.warning * sketch.getKmerSpace()) / (1. - parameters.warning);

Expand Down
127 changes: 91 additions & 36 deletions src/mash/Sketch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <capnp/serialize.h>
#include <sys/mman.h>
#include <math.h>
#include <list>

#define SET_BINARY_MODE(file)
#define CHUNK 16384
Expand Down Expand Up @@ -91,6 +92,15 @@ uint64_t Sketch::getReferenceIndex(string id) const
}
}

void Sketch::initFromReads(const vector<string> & files, const Parameters & parametersNew)
{
parameters = parametersNew;

useThreadOutput(sketchFile(new SketchInput(files, 0, 0, "", "", parameters)));

createIndex();
}

int Sketch::initFromFiles(const vector<string> & files, const Parameters & parametersNew, int verbosity, bool enforceParameters, bool contain)
{
parameters = parametersNew;
Expand Down Expand Up @@ -155,9 +165,11 @@ int Sketch::initFromFiles(const vector<string> & files, const Parameters & param

// init fully
//
threadPool.runWhenThreadAvailable(new SketchInput(files[i], 0, 0, "", "", parameters), loadCapnp);
vector<string> file;
file.push_back(files[i]);
threadPool.runWhenThreadAvailable(new SketchInput(file, 0, 0, "", "", parameters), loadCapnp);
}
else
else
{
FILE * inStream;

Expand Down Expand Up @@ -192,8 +204,10 @@ int Sketch::initFromFiles(const vector<string> & files, const Parameters & param
{
fclose(inStream);
}

threadPool.runWhenThreadAvailable(new SketchInput(files[i], 0, 0, "", "", parameters), sketchFile);

vector<string> file;
file.push_back(files[i]);
threadPool.runWhenThreadAvailable(new SketchInput(file, 0, 0, "", "", parameters), sketchFile);
}
else
{
Expand Down Expand Up @@ -335,7 +349,7 @@ bool Sketch::sketchFileBySequence(FILE * file, ThreadPool<Sketch::SketchInput, S
//
memcpy(seqCopy, seq->seq.s, l);

threadPool->runWhenThreadAvailable(new SketchInput("", seqCopy, l, string(seq->name.s, seq->name.l), string(seq->comment.s, seq->comment.l), parameters), sketchSequence);
threadPool->runWhenThreadAvailable(new SketchInput(vector<string>(), seqCopy, l, string(seq->name.s, seq->name.l), string(seq->comment.s, seq->comment.l), parameters), sketchSequence);

while ( threadPool->outputAvailable() )
{
Expand Down Expand Up @@ -918,7 +932,7 @@ bool hasSuffix(string const & whole, string const & suffix)

Sketch::SketchOutput * loadCapnp(Sketch::SketchInput * input)
{
const char * file = input->fileName.c_str();
const char * file = input->fileNames[0].c_str();
int fd = open(file, O_RDONLY);

struct stat fileInfo;
Expand Down Expand Up @@ -1137,31 +1151,13 @@ void setMinHashesForReference(Sketch::Reference & reference, const MinHashHeap &

Sketch::SketchOutput * sketchFile(Sketch::SketchInput * input)
{
gzFile fp;

if ( input->fileName == "-" )
{
fp = gzdopen(fileno(stdin), "r");
}
else
{
fp = gzopen(input->fileName.c_str(), "r");
}

kseq_t *seq = kseq_init(fp);

const Sketch::Parameters & parameters = input->parameters;

Sketch::SketchOutput * output = new Sketch::SketchOutput();

output->references.resize(1);
Sketch::Reference & reference = output->references[0];

if ( input->fileName != "-" )
{
reference.name = input->fileName;
}

MinHashHeap minHashHeap(parameters.use64, parameters.minHashesPerWindow, parameters.reads ? parameters.minCov : 1, parameters.memoryBound);

reference.length = 0;
Expand All @@ -1171,8 +1167,57 @@ Sketch::SketchOutput * sketchFile(Sketch::SketchInput * input)
int count = 0;
bool skipped = false;

while ((l = kseq_read(seq)) >= 0)
int fileCount = input->fileNames.size();
gzFile fps[fileCount];
list<kseq_t *> kseqs;
//
for ( int f = 0; f < fileCount; f++ )
{
if ( input->fileNames[f] == "-" )
{
if ( f > 1 )
{
cerr << "ERROR: '-' for stdin must be first input" << endl;
exit(1);
}

fps[f] = gzdopen(fileno(stdin), "r");
}
else
{
if ( reference.name == "" && input->fileNames[f] != "-" )
{
reference.name = input->fileNames[f];
}

fps[f] = gzopen(input->fileNames[f].c_str(), "r");
}

kseqs.push_back(kseq_init(fps[f]));
}

list<kseq_t *>::iterator it = kseqs.begin();

while ( kseqs.begin() != kseqs.end() )
{
l = kseq_read(*it);

if ( l < -1 ) // error
{
break;
}

if ( l == -1 ) // eof
{
kseq_destroy(*it);
it = kseqs.erase(it);
if ( it == kseqs.end() )
{
it = kseqs.begin();
}
continue;
}

if ( l < parameters.kmerSize )
{
skipped = true;
Expand All @@ -1181,21 +1226,22 @@ Sketch::SketchOutput * sketchFile(Sketch::SketchInput * input)

if ( count == 0 )
{
if ( input->fileName == "-" )
if ( input->fileNames[0] == "-" )
{
reference.name = seq->name.s;
reference.comment = seq->comment.s ? seq->comment.s : "";
reference.name = (*it)->name.s;
reference.comment = (*it)->comment.s ? (*it)->comment.s : "";
}
else
{
reference.comment = seq->name.s;
reference.comment = (*it)->name.s;
reference.comment.append(" ");
reference.comment.append(seq->comment.s ? seq->comment.s : "");
reference.comment.append((*it)->comment.s ? (*it)->comment.s : "");
}
}

count++;


//if ( verbosity > 0 && parameters.windowed ) cout << '>' << seq->name.s << " (" << l << "nt)" << endl << endl;
//if (seq->comment.l) printf("comment: %s\n", seq->comment.s);
//printf("seq: %s\n", seq->seq.s);
Expand All @@ -1206,13 +1252,20 @@ Sketch::SketchOutput * sketchFile(Sketch::SketchInput * input)
reference.length += l;
}

addMinHashes(minHashHeap, seq->seq.s, l, parameters);
addMinHashes(minHashHeap, (*it)->seq.s, l, parameters);

if ( parameters.reads && parameters.targetCov > 0 && minHashHeap.estimateMultiplicity() >= parameters.targetCov )
{
l = -1; // success code
break;
}

it++;

if ( it == kseqs.end() )
{
it = kseqs.begin();
}
}

if ( parameters.reads )
Expand All @@ -1239,19 +1292,19 @@ Sketch::SketchOutput * sketchFile(Sketch::SketchInput * input)

if ( l != -1 )
{
cerr << "\nERROR: reading " << input->fileName << "." << endl;
cerr << "\nERROR: reading " << (input->fileNames.size() > 0 ? "input files" : input->fileNames[0]) << "." << endl;
exit(1);
}

if ( reference.length == 0 )
{
if ( skipped )
{
cerr << "\nWARNING: All fasta records in " << input->fileName << " were shorter than the k-mer size (" << parameters.kmerSize << ")." << endl;
cerr << "\nWARNING: All fasta records in " << (input->fileNames.size() > 0 ? "input files" : input->fileNames[0]) << " were shorter than the k-mer size (" << parameters.kmerSize << ")." << endl;
}
else
{
cerr << "\nERROR: Did not find fasta records in \"" << input->fileName << "\"." << endl;
cerr << "\nERROR: Did not find fasta records in \"" << (input->fileNames.size() > 0 ? "input files" : input->fileNames[0]) << "\"." << endl;
}

exit(1);
Expand All @@ -1273,8 +1326,10 @@ Sketch::SketchOutput * sketchFile(Sketch::SketchInput * input)
}
}

kseq_destroy(seq);
gzclose(fp);
for ( int i = 0; i < fileCount; i++ )
{
gzclose(fps[i]);
}

return output;
}
Expand Down
9 changes: 6 additions & 3 deletions src/mash/Sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ class Sketch

struct SketchInput
{
SketchInput(std::string fileNameNew, char * seqNew, uint64_t lengthNew, const std::string & nameNew, const std::string & commentNew, const Sketch::Parameters & parametersNew)
SketchInput(std::vector<std::string> fileNamesNew, char * seqNew, uint64_t lengthNew, const std::string & nameNew, const std::string & commentNew, const Sketch::Parameters & parametersNew)
:
fileName(fileNameNew),
fileNames(fileNamesNew),
seq(seqNew),
length(lengthNew),
name(nameNew),
Expand All @@ -159,7 +159,7 @@ class Sketch
}
}

std::string fileName;
std::vector<std::string> fileNames;

char * seq;

Expand Down Expand Up @@ -200,7 +200,10 @@ class Sketch
bool hasHashCounts() const {return references.size() > 0 && references.at(0).counts.size() > 0;}
bool hasLociByHash(hash_t hash) const {return lociByHash.count(hash);}
int initFromFiles(const std::vector<std::string> & files, const Parameters & parametersNew, int verbosity = 0, bool enforceParameters = false, bool contain = false);
void initFromReads(const std::vector<std::string> & files, const Parameters & parametersNew);
uint64_t initParametersFromCapnp(const char * file);
void setReferenceName(int i, const std::string name) {references[i].name = name;}
void setReferenceComment(int i, const std::string comment) {references[i].comment = comment;}
bool sketchFileBySequence(FILE * file, ThreadPool<Sketch::SketchInput, Sketch::SketchOutput> * threadPool);
void useThreadOutput(SketchOutput * output);
void warnKmerSize(uint64_t lengthMax, const std::string & lengthMaxName, double randomChance, int kMin, int warningCount) const;
Expand Down
6 changes: 6 additions & 0 deletions src/mash/sketchParameterSetup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ int sketchParameterSetup(Sketch::Parameters & parameters, const Command & comman
parameters.genomeSize = command.getOption("genome").getArgumentAsNumber();
}

if ( parameters.reads && command.getOption("threads").active )
{
cerr << "ERROR: The option " << command.getOption("threads").identifier << " cannot be used with " << command.getOption("reads").identifier << "." << endl;
return 1;
}

if ( parameters.reads && ! parameters.concatenated )
{
cerr << "ERROR: The option " << command.getOption("individual").identifier << " cannot be used with " << command.getOption("reads").identifier << "." << endl;
Expand Down

0 comments on commit 03b28ed

Please sign in to comment.