From d97564ac977bd53a27d9022c1751665dd53d7f0c Mon Sep 17 00:00:00 2001 From: Gabriel Al-Ghalith Date: Thu, 26 Oct 2017 12:11:12 -0500 Subject: [PATCH] v0.99.5a Compressive database implemented. This can greatly speed up database build (in absence of -f), and produce smaller, faster databases when there is a lot of redundant sheared genomes being built from. To invoke, use '-d DNA -s' at a minimum. --- burst.c | 1014 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 765 insertions(+), 249 deletions(-) diff --git a/burst.c b/burst.c index f4922f8..5ffbb1c 100644 --- a/burst.c +++ b/burst.c @@ -2,7 +2,7 @@ Copyright (C) 2015-2017 Knights Lab, Regents of the University of Minnesota. This software is released under the GNU Affero General Public License (AGPL) v3.0. */ -#define VER "v0.99.4b" +#define VER "v0.99.5a" #define _LARGEFILE_SOURCE_ #define FILE_OFFSET_BITS 64 #include @@ -60,7 +60,7 @@ This software is released under the GNU Affero General Public License (AGPL) v3. #define LORAM 1 typedef enum { - FORAGE, BEST, ALLPATHS, CAPITALIST, + FORAGE, BEST, ALLPATHS, CAPITALIST, ANY, MATRIX, PAIRED, MAPPED, INLINE } Mode; typedef enum {DNA_16, DNA_8, DNA_4, PROT, DATA, QUICK} DBType; @@ -76,6 +76,7 @@ int Xalpha = 0; int REBASE = 0; int DO_FP = 0; int DO_ACCEL = 0; + uint32_t TAXACUT = 10; float THRES = 0.97f; long REBASE_AMT = 500, DB_QLEN = 500; @@ -111,6 +112,7 @@ long REBASE_AMT = 500, DB_QLEN = 500; printf(" [name]: Optional. Can be DNA, RNA, or QUICK [QUICK]\n");\ printf(" [qLen]: Optional. Max query length to search in DB [%d]\n",DB_QLEN); \ printf("\nPerformance parameters:\n"); \ + printf("--dbpartition (-dp) : Split DB making into chunks (lossy) [%u]\n",1); \ printf("--taxacut (-bc) : allow 1/ disagreeing taxonomy calls. 1/[%u]\n",TAXACUT); \ printf("--taxa_ncbi (-bn): Assume NCBI header format '>xxx|accsn...' for taxonomy\n"); \ printf("--skipambig (-sa): Do not consider highly ambiguous queries (5+ ambigs)\n"); \ @@ -192,6 +194,7 @@ typedef struct { typedef struct {uint32_t v, i;} Split; typedef struct {char *s; uint32_t ix;} StrIxPair; +typedef struct {char *s; uint64_t ix;} StrIxPair64; typedef struct { float score[16]; @@ -227,6 +230,10 @@ struct AccelNode { AccelNode *next; uint32_t ref; }; +typedef struct { + char *seq; + uint32_t len, ix; +} Tuxedo; typedef struct {char *Head, *Tax;} TaxPair_t; //K P C O F G S SS+ @@ -262,6 +269,7 @@ typedef struct { uint32_t *BadList, badListSz; void **Accelerators; DBType dbType; + int cparts; } Reference_Data; typedef struct { @@ -336,6 +344,67 @@ static inline void parallel_sort_strpack(StrIxPair *Pack, uint32_t len) PARALLEL_SORT_PROTOTYPE(StrIxPair, s, 5, NIB5) static inline void parallel_sort_unibin(UniBin *Pack, uint32_t len) PARALLEL_SORT_PROTOTYPE(UniBin, Seq, 5, NIB5) +static inline void parallel_sort_tuxedo(Tuxedo *Pack, uint32_t len) { + int tuxCmp(const void *a, const void *b) { + Tuxedo *A = (Tuxedo *)a, *B = (Tuxedo *)b; + char *s1 = A->seq, *s2 = B->seq; + uint32_t len1 = A->len, len2 = B->len, ml = MIN(len1,len2); + uint32_t i = 5; + for (; i < ml; ++i) if (s1[i] != s2[i]) break; + if (i < ml) return s1[i] - s2[i]; + return len1 < len2 ? -1 : 1; + } + void tuxSort(Tuxedo *a, size_t n, size_t s) + {qsort(a,n,s,tuxCmp);} + #define qsort(a,n,s,c) tuxSort(a,n,s) + PARALLEL_SORT_PROTOTYPE(Tuxedo, seq, 5, NIB5) + #undef qsort +} + +/* static inline void parallel_sort_tuxedo(Tuxedo *Pack, uint32_t len) { + //PARALLEL_SORT_PROTOTYPE(Tuxedo, seq, 5, NIB5) + static const uint32_t NLB = 1 << (5*4); + Tuxedo **Bins = calloc(NLB,sizeof(*Bins)); + uint32_t *BcountO = calloc(1+NLB,sizeof(*BcountO)), + *Bcount = BcountO + 1; + #pragma omp parallel for + for (uint32_t i = 0; i < len; ++i) { + char *s = Pack[i].seq; + uint32_t nib = s[0] << 16 | s[1] << 12 | s[2] << 8 | s[3] << 4 | s[4]; + if (nib >= NLB) {printf("Uh oh. nib [%u] >= NLB [%u]\n",nib,NLB); exit(5);} + #pragma omp atomic update + ++Bcount[nib]; + } + for (uint32_t i = 0; i < NLB; ++i) if (Bcount[i]) + Bins[i] = malloc(Bcount[i]*sizeof(*Bins[i])), + Bcount[i] = 0; + #pragma omp parallel for + for (uint32_t i = 0; i < len; ++i) { + char *s = Pack[i].seq; + uint32_t nib = s[0] << 16 | s[1] << 12 | s[2] << 8 | s[3] << 4 | s[4]; + uint32_t ix; + #pragma omp atomic capture + ix = Bcount[nib]++; + Bins[nib][ix] = Pack[i]; + } + int cmpFunc(const void *a, const void *b) { + return strcmp(((Tuxedo *)a)->seq + 5, + ((Tuxedo *)b)->seq + 5); + } + #pragma omp parallel for schedule(dynamic,1) + for (uint32_t i = 0; i < NLB; ++i) if (Bins[i]) + qsort(Bins[i],Bcount[i],sizeof(*Bins[i]),cmpFunc); + --Bcount; + for (uint32_t i = 2; i <= NLB; ++i) Bcount[i] += Bcount[i-1]; + #pragma omp parallel for schedule(dynamic,1) + for (uint32_t i = 0; i < NLB; ++i) { + uint32_t init = Bcount[i], ed = Bcount[i+1]; + for (uint32_t pt = init; pt < ed; ++pt) + Pack[pt] = Bins[i][pt - init]; + free(Bins[i]); + } + free(Bins), free(BcountO); +} */ // Taxonomy handling char NULLTAX[1] = {0}; @@ -461,7 +530,7 @@ size_t parse_tl_fasta(char * filename, char ***HeadersP, char ***SeqsP, uint32_t } size_t parse_tl_fasta_db(char *ref_FN, char ***HeadersP, char **SeqsP, - uint64_t **OffsP, uint64_t *numLet) { + char ***OffsP, uint64_t *numLet) { size_t linelen = INT32_MAX; //1k entries, 500 meg lines size_t cur_len = linelen, cur_sz = 1000; FILE *file = fopen(ref_FN,"rb"); @@ -469,8 +538,8 @@ size_t parse_tl_fasta_db(char *ref_FN, char ***HeadersP, char **SeqsP, { fprintf(stderr,"Cannot open FASTA file: %s.\n",ref_FN); exit(2); } char **Headers = malloc(cur_sz * sizeof(*Headers)), *Seqs = malloc(cur_len * sizeof(*Seqs)); - uint64_t *Offsets = malloc(cur_sz * sizeof(*Offsets)), - cur_off = -1, ns = -1; + char **Offsets = malloc(cur_sz * sizeof(*Offsets)); + uint64_t cur_off = -1, ns = -1; char *line = malloc(linelen), *lineO = line; if (!line) {fputs("OOM:ptfdLn\n",stderr); exit(3);} int lastHd = 0; @@ -491,7 +560,7 @@ size_t parse_tl_fasta_db(char *ref_FN, char ***HeadersP, char **SeqsP, } lastHd = 1; Headers[ns] = memcpy(malloc(len), line+1, len); - Offsets[ns] = ++cur_off; // insert null post-seq + Offsets[ns] = (char*)++cur_off; // insert null post-seq case '\0': case ' ': break; default: // we're in sequence (hopefully!) lastHd = 0; @@ -515,7 +584,9 @@ size_t parse_tl_fasta_db(char *ref_FN, char ***HeadersP, char **SeqsP, if (!Headers || !Offsets || !Seqs) { fputs("Error OOM terminus p1",stderr); exit(3); } memset(Seqs+cur_off,'\0',15); - Offsets[ns] = cur_off; + for (uint64_t i = 0; i < ns; ++i) + Offsets[i] = (uint64_t)Offsets[i] + Seqs; + Offsets[ns] = Seqs + cur_off; Headers[ns] = 0; // just a tailcap // return the proper data now @@ -1187,10 +1258,6 @@ void setScore() { SCOREFAST[15] = _mm_set_epi8(0,1,1,1,0,1,1,0,1,0,Z,0,0,1,0,-1); //D } -typedef struct { - char *seq; - uint32_t len, ix; -} Tuxedo; static int cpcmp(const void *a, const void *b) { return strcmp(**(char ***)a, **(char ***)b); } static int cmpPackLen(const void *first, const void *second) { @@ -1563,12 +1630,146 @@ static inline void create_sse2_profiles(Reference_Data *Rd) { Rd->ProfClump = ProfClump; } +// New -- Cucasort mk2 inplace +inline int str_d_gt_0(char *A, char *B, uint32_t maxD) { + //for (uint32_t i = 0; i < maxD; ++i) + // if (A[i] != B[i]) return A[i] > B[i]; + for (uint32_t i = 0; i < maxD; i+=16) { + __m128i x = _mm_lddqu_si128((void*)A + i), + y = _mm_lddqu_si128((void*)B + i); + __m128i e = _mm_cmpeq_epi8(x,y); + uint16_t t = _mm_movemask_epi8(e); + if (t != 0xFFFF) { + uint32_t p = __builtin_ctz(t^-1); + if (i+p >= maxD) return 0; + return A[i+p] > B[i+p]; //~t + } + } + return 0; +} +inline void iSort(char **s, uint32_t N, uint32_t d, uint32_t maxD) { + for (uint32_t i = 1, j; i < N; ++i) { + char *key = s[i]; + for (j = i; j && str_d_gt_0(s[j-1]+d,key+d,maxD-d); --j) s[j] = s[j-1]; + //if (j != i) + //memmove(s + j + 1, s + j, sizeof(*s) * (i - j)), + s[j] = key; + } +} +void Cucasort(char **S, uint64_t N, uint32_t d, uint32_t maxD) { + // Recursive termination + if (N < 1 || d >= maxD) return; + if (N < 32) iSort(S,N,d,maxD); + uint64_t Bounds[16]; + { // define scope compartment for temp vars + uint64_t CacheO[17] = {0}, *Cache = CacheO + 1; //, Bounds[16]; + for (uint64_t i = 0; i < N; ++i) + ++Cache[S[i][d]]; + *Bounds = *Cache; + for (int i = 1; i < 16; ++i) + Cache[i] += Cache[i-1], + Bounds[i] = Cache[i]; // sep? + --Cache; + for (int i = 0; i < 16;) { + if (Cache[i] >= Bounds[i]) {++i; continue;} + char *cur = S[Cache[i]], *t; + while (cur[d] != i) // sync needed + t = S[Cache[cur[d]]], + S[Cache[cur[d]]++] = cur, + cur = t; + S[Cache[i]++] = cur; + } + } // scope variables destroyed (some) + // second pass: scan and recurse on identical non-zero ranges + for (int i = 1; i < 16; ++i) { + char **NS = S+Bounds[i-1]; + uint64_t len = Bounds[i] - Bounds[i-1]; + if (len > 1) Cucasort(NS,len,d+1,maxD); + } +} + +//////////////////QS3 +#define CUTOFF 32 +#define MEDCUT 100 +//#define SWAP(s,i,j) { char *t = s[i]; s[i] = s[j]; s[j] = t; } +inline void swap(char **s, int i, int j) + { char *t = s[i]; s[i] = s[j]; s[j] = t; } +inline int m3(char **s, int ia, int ib, int ic, int d) { + int va, vb, vc; + if ((va=s[ia][d]) == (vb=s[ib][d])) return ia; + if ((vc=s[ic][d]) == va || vc == vb) return ic; + return va < vb ? + (vb < vc ? ib : (va < vc ? ic : ia ) ) : + (vb > vc ? ib : (va < vc ? ia : ic ) ); +} +void Qs3w(char **s, unsigned n, int d, int maxD) { + if (d >= maxD) return; + if (n < CUTOFF) { iSort(s,n,d,maxD); return; } + unsigned pl = 0, pm = n >> 1u, z; + int le, lt, gt, ge, v, pn = n-1; + if (n > MEDCUT) { + z = n >> 3u; + pl = m3(s, pl, pl+z, pl + (z << 1u), d); + pm = m3(s, pm-z, pm, pm+z, d); + pn = m3(s, pn - (z << 1u), pn-z, pn, d); + } + pm = m3(s, pl, pm, pn, d); + swap(s, 0, pm); + v = s[0][d]; + for (le = 1; le < n && s[le][d] == v; le++); + if (le == n) { if (v) Qs3w(s, n, d+1, maxD); return; } + lt = le; gt = ge = n-1; + for (;;) { + for ( ; lt <= gt && s[lt][d] <= v; lt++) + if (s[lt][d] == v) swap(s, le++, lt); + for ( ; lt <= gt && s[gt][d] >= v; gt--) + if (s[gt][d] == v) swap(s, gt, ge--); + if (lt > gt) break; + swap(s, lt++, gt--); + } + int r = MIN(le, lt-le); + for (int i = 0; i < r; ++i) swap(s,i,lt-r+i); + r = MIN(ge-gt, n-ge-1); + for (int i = 0; i < r; ++i) swap(s,lt+i,n-r+i); + Qs3w(s, lt-le, d, maxD); + if (v) Qs3w(s + lt-le, le + n-ge-1, d+1, maxD); + Qs3w(s + n-(ge-gt), ge-gt, d, maxD); +} +/////////////////QS3 +inline uint32_t whereDiff(char *A, char *B, uint32_t len) { + for (uint32_t i = 0; i < len; i+=16) { + __m128i x = _mm_lddqu_si128((void*)A + i), + y = _mm_lddqu_si128((void*)B + i); + __m128i e = _mm_cmpeq_epi8(x,y); + uint16_t t = _mm_movemask_epi8(e); + if (t != 0xFFFF) { + uint32_t p = __builtin_ctz(t^-1); + return i+p; //A[i+p] - B[i+p]; //~t + } + } + return len; +} +inline uint32_t whereDiffMsk(char *A, char *B, uint32_t len) { + for (uint32_t i = 0; i < len; i+=16) { + __m128i x = _mm_lddqu_si128((void*)A + i), + y = _mm_lddqu_si128((void*)B + i); + __m128i e = _mm_cmpeq_epi8(_mm_and_si128(x,_mm_set1_epi8(15)), + _mm_and_si128(y,_mm_set1_epi8(15))); + uint16_t t = _mm_movemask_epi8(e); + if (t != 0xFFFF) { + uint32_t p = __builtin_ctz(t^-1); + return i+p; //A[i+p] - B[i+p]; //~t + } + } + return len; +} + // curate is 0 (no dedupe), 1 (dedupe), 2 (make db so skip sse2 profiles) static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t maxLenQ, int curate) { // variables for the db code - char *SeqDump = 0; uint64_t numLet = 0, *Offs = 0; + char *SeqDump = 0; uint64_t numLet = 0; // *Offs = 0; if (Rd->dbType == QUICK) Rd->totR = parse_tl_fasta(ref_FN, &Rd->RefHead, &Rd->RefSeq, &Rd->RefLen); - else Rd->totR = parse_tl_fasta_db(ref_FN, &Rd->RefHead, &SeqDump, &Offs, &numLet); + else Rd->totR = parse_tl_fasta_db(ref_FN, &Rd->RefHead, &SeqDump, &Rd->RefSeq, &numLet); printf("Parsed %u references.\n",Rd->totR); // Translate nucleotides by parallel register lookups @@ -1587,9 +1788,256 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t printf("\nInitiating database shearing procedure [shear %u, ov %u].\n", shear, ov); if (Rd->dbType == DNA_16) { - puts("Performing compressive database optimization..."); - puts("NOT YET IMPLEMENTED\n"); exit(1); + #define NL 13 + static const uint64_t NLB = (uint64_t)1 << (NL*2); + #if NL==16 + #define NIB (s[0]-1)<<30|(s[1]-1)<<28|(s[2]-1)<<26|(s[3]-1)<<24|(s[4]-1)<<22|(s[5]-1)<<20|\ + (s[6]-1)<<18|(s[7]-1)<<16|(s[8]-1)<<14|(s[9]-1)<<12|(s[10]-1)<<10|(s[11]-1)<<8|(s[12]-1)<<6|\ + (s[13]-1)<<4|(s[14]-1)<<2|(s[15]-1) + #elif NL==15 + #define NIB (s[0]-1)<<28|(s[1]-1)<<26|(s[2]-1)<<24|(s[3]-1)<<22|(s[4]-1)<<20|\ + (s[5]-1)<<18|(s[6]-1)<<16|(s[7]-1)<<14|(s[8]-1)<<12|(s[9]-1)<<10|(s[10]-1)<<8|(s[11]-1)<<6|\ + (s[12]-1)<<4|(s[13]-1)<<2|(s[14]-1) + #elif NL==14 + #define NIB (s[0]-1)<<26|(s[1]-1)<<24|(s[2]-1)<<22|(s[3]-1)<<20|\ + (s[4]-1)<<18|(s[5]-1)<<16|(s[6]-1)<<14|(s[7]-1)<<12|(s[8]-1)<<10|(s[9]-1)<<8|(s[10]-1)<<6|\ + (s[11]-1)<<4|(s[12]-1)<<2|(s[13]-1) + #elif NL==13 + #define NIB (s[0]-1)<<24|(s[1]-1)<<22|(s[2]-1)<<20|\ + (s[3]-1)<<18|(s[4]-1)<<16|(s[5]-1)<<14|(s[6]-1)<<12|(s[7]-1)<<10|(s[8]-1)<<8|(s[9]-1)<<6|\ + (s[10]-1)<<4|(s[11]-1)<<2|(s[12]-1) + #elif NL==12 + #define NIB (s[0]-1)<<22|(s[1]-1)<<20|\ + (s[2]-1)<<18|(s[3]-1)<<16|(s[4]-1)<<14|(s[5]-1)<<12|(s[6]-1)<<10|(s[7]-1)<<8|(s[8]-1)<<6|\ + (s[9]-1)<<4|(s[10]-1)<<2|(s[11]-1) + #endif + uint32_t shear16p5 = shear + ov; //((shear+ov) / 16u) * 16u + 5u; + Rd->cparts += !Rd->cparts; + uint32_t cp_range = origR / Rd->cparts + (origR % Rd->cparts != 0); + printf("Using compressive optimization (%u partitions; ~%u refs).\n",Rd->cparts,cp_range); + //printf("-->CompDB: shearOv conv = %u --> %u [NLB=%llu]\n",shear+ov,shear16p5,NLB); + + uint64_t *BcountO = calloc(NLB+3,sizeof(*BcountO)); + if (!BcountO) {fputs("OOM:BcountO\n",stderr); exit(3);} + + uint64_t maxChain = 0, maxSh = 0; + double wtime; + for (uint32_t rix = 0, pass = 0; rix < origR; rix += cp_range, ++pass) { + wtime = omp_get_wtime(); + memset(BcountO,0,(NLB+3)*sizeof(*BcountO)); + uint64_t *Bcount = BcountO + 2; + uint32_t red = MIN(origR,rix+cp_range); + #pragma omp parallel for schedule(dynamic) + for (uint32_t i = rix; i < red; ++i) { + char *so = origRefSeq[i]; + uint32_t len = origRefSeq[i+1] - origRefSeq[i]; + //printf("Seq i=%u, len=%u\n",i,len); + if (len < shear16p5) continue; + len -= shear16p5; + for (uint32_t j = 0; j <= len; ++j) { + char *s = so + j; + for (int k=0; k 4) goto L4_P6; + uint32_t nib = NIB; + #pragma omp atomic update + ++Bcount[nib]; + L4_P6:NULL; + } + } + printf("[%u] First pass: populated bin counters [%f]\n", + pass, omp_get_wtime()-wtime); + + wtime = omp_get_wtime(); + for (uint64_t i = 1; i <= NLB; ++i) Bcount[i] += Bcount[i-1]; + uint64_t totCnt = Bcount[NLB]; + printf("--> Out of the %llu original places, %llu are eligible.\n",numLet, totCnt); + --Bcount; + char **Ptrs = malloc(totCnt*sizeof(*Ptrs)); + if (!Ptrs) {fputs("OOM:Ptrs_X\n",stderr); exit(3);} + printf("[%u] Second pass: shift count vector [%f]\n", + pass, omp_get_wtime()-wtime); + + wtime = omp_get_wtime(); + #pragma omp parallel for schedule(dynamic) + for (uint32_t i = rix; i < red; ++i) { + char *so = origRefSeq[i]; + uint32_t len = origRefSeq[i+1] - origRefSeq[i]; + if (len < shear16p5) continue; + len -= shear16p5; + for (uint32_t j = 0; j <= len; ++j) { + char *s = so + j; + for (int k=0; k 4) goto L4_P7; + uint32_t nib = NIB; + uint64_t ix; + #pragma omp atomic capture + ix = Bcount[nib]++; + Ptrs[ix] = s; + L4_P7:NULL; + } + } + printf("[%u] Third pass: add pointers to Ptr vector [%f]\n", + pass, omp_get_wtime() - wtime); + + wtime = omp_get_wtime(); + --Bcount; + #pragma omp parallel for schedule(dynamic,1) + for (uint64_t i = 0; i < NLB; ++i) { + uint64_t len = Bcount[i+1]-Bcount[i]; + char **section = Ptrs + Bcount[i]; + Qs3w(section,len,NL,shear16p5); + //Cucasort(section, len, 5, shear16p5); + } + printf("[%u] Fourth pass: sort segments [%f]\n", + pass, omp_get_wtime() - wtime); + + uint32_t eqlen = shear16p5 - NL, nibLen = 24 - NL; + if (maxChain == 0 && maxSh == 0) { + // Phase 2: duplicate checks + uint64_t dupes = 0, tdupes = 0, useful = 3; + wtime = omp_get_wtime(); + #pragma omp parallel for schedule(dynamic) reduction(+:dupes,tdupes) reduction(max:maxChain,maxSh) + for (uint64_t i = 0; i < NLB; ++i) { + uint64_t chain = 0, sh = 0; + for (uint64_t j = Bcount[i]+1; j < Bcount[i+1]; ++j) { + uint32_t where = whereDiff(Ptrs[j-1]+NL,Ptrs[j]+NL,eqlen); + if (where >= nibLen) ++sh; + else if (sh > maxSh) maxSh = sh; + + if (where >= eqlen) ++chain; + else { + dupes += chain >= useful; + tdupes += chain > 0; + if (chain > maxChain) maxChain = chain; + chain = 0; + } + } + } + printf("[%u] Fifth pass: tally %llu=%f (%llu=%f) [%f]\n", pass, dupes, + (double)dupes/totCnt, tdupes, (double)tdupes/totCnt, omp_get_wtime()-wtime); + printf("--> Max chain = %llu, max sh = %llu\n",maxChain,maxSh); + } + + // dynamic range compression and marking + uint64_t sh1 = sqrt(maxSh)/2, sh2 = sh1*4/3, sh3 = sh1*3; + printf("--> Bounds: max %llu -> sh3 %llu -> sh2 %llu -> sh1 %llu\n", + maxSh,sh3,sh2,sh1); + + wtime = omp_get_wtime(); + #pragma omp parallel for schedule(dynamic) + for (uint64_t i = 0; i < NLB; ++i) { + uint64_t chain = 0, sh = 0; + for (uint64_t j = Bcount[i]+1; j < Bcount[i+1]; ++j) { + uint32_t where = whereDiffMsk(Ptrs[j-1]+NL,Ptrs[j]+NL,eqlen); + + // First consider local matches + if (where >= nibLen) ++sh; + else { + if (sh > sh1) { + uint8_t conv = sh>=sh3 ? 3 : sh>=sh2 ? 2 : 1; + for (uint64_t k = j; k >= j - sh; --k) + *Ptrs[k-1] |= conv << 4; + } + sh = 0; + } + // Consider full dupes (takes priority) + if (where >= eqlen) ++chain; + else { + if (chain) { + uint64_t t = chain*2048/maxChain; + t = MIN(2048,t); + uint8_t conv = 31 - __builtin_clz((uint32_t)t) + 4; + for (uint64_t k = j; k >= j - chain; --k) + *Ptrs[k-1] |= conv << 4; + } + chain = 0; + } + } + } + free(Ptrs); + printf("[%u] Sixth pass: populate duplicates [%f]\n\n", + pass, omp_get_wtime()-wtime); + } + printf("-->CompDB: All sequences tallied.\n"); + free(BcountO); + + // Finally, use the markings to guide shearing + wtime = omp_get_wtime(); + // goal: minimal shear size such that we start a shear at the max-ranked dupe + // need to create reflens out of the offsets between files. + uint32_t *RefLen = Rd->RefLen = malloc(origR*sizeof(*RefLen)); + for (uint32_t i = 0; i < origR; ++i) { + RefLen[i] = origRefSeq[i+1]-origRefSeq[i] - 1; + long unit = (long)RefLen[i] - (long)ov; + unit = unit <= 0 ? 1 : unit; + numRrebase += unit / ov + (unit % ov != 0); + } + numRrebase *= 2; // worst-case with leeway + //ReRefIx = malloc(numRrebase*sizeof(*ReRefIx)); + uint32_t *RefStart = malloc(numRrebase*sizeof(*RefStart)); + // Define (and check) new RefLen, RefSeq, and RefHead too for inline addition + uint32_t *newRefLen = malloc(numRrebase*sizeof(*newRefLen)); + char **newRefSeq = malloc(numRrebase*sizeof(*newRefSeq)), + **newRefHead = malloc(numRrebase*sizeof(*newRefHead)); + if (!RefStart || !newRefHead) {fputs("OOM:RefStart\n",stderr); exit(3);} + printf("-->CompDB: Beginning rebase...\n"); + uint32_t x = 0; // "real" rebase ix (change this so that it's 64-bit? Or cast in malloc's?) + for (uint32_t i = 0; i < origR; ++i) { + uint8_t *ref = (uint8_t *)origRefSeq[i]; + uint32_t bstFlgPos = 0, end = 0, bstFlg = *ref >> 4; + while (end < RefLen[i]) { + uint32_t ix; + //#pragma omp atomic capture + ix = x++; + if (ix >= numRrebase) {fputs("ERROR: Absolutely not. Out of bounds.\n",stderr); exit(4);} + RefStart[ix] = bstFlgPos; + newRefSeq[ix] = ref + bstFlgPos; + newRefHead[ix] = origRefHead[i]; + + //uint32_t bstFlgO = bstFlg; + + // scan for best flag in this range + uint32_t bf=0; + uint32_t bi, maxIX = MIN(RefLen[i],bstFlgPos+shear); + for (uint32_t j = bstFlgPos+1; j < maxIX; ++j) + if (ref[j] >> 4 >= bf) bf = ref[j]>>4, bi = j; + if (bf > bstFlg /* && (bi > bstFlgPos + LATENCY || bf > 3) */) // terminate here + ov + bstFlgPos = bi; + else bstFlgPos += shear; + end = bstFlg > 3 ? MIN(maxIX+ov,RefLen[i]) : MIN(bstFlgPos + ov,RefLen[i]); + bstFlg = ref[bstFlgPos] >> 4; + //end = MIN(bstFlgPos + ov,RefLen[i]); // end of this shear + newRefLen[ix] = end - RefStart[ix]; + //if (!i) printf("Shear %u: start %u [%u] -> bstpos %u end %u [%u]\n", ix, RefStart[ix],bstFlgO,bstFlgPos,end,bstFlg); + } + + // OLD + //long unit = (long)RefLen[i] - (long)ov; + //unit = unit < 0 ? 1 : unit; + //for (uint32_t j = 0; j < unit; j+= shear) + // ReRefIx[x] = i, RefStart[x++] = j; + } + printf("Rebased [%u orig] --> [%u rebase] --> [%u adj]\n",origR,numRrebase/2,x); + + //ReRefIx = realloc(ReRefIx,x*sizeof(*ReRefIx)); + numRrebase = x; + newRefLen = realloc(newRefLen,x*sizeof(*newRefLen)); + newRefSeq = realloc(newRefSeq,x*sizeof(*newRefSeq)); + newRefHead = realloc(newRefHead,x*sizeof(*newRefHead)); + RefStart = realloc(RefStart,x*sizeof(*RefStart)); + Rd->totR = numRrebase; + Rd->RefLen = newRefLen; + Rd->RefSeq = newRefSeq; + Rd->RefHead = newRefHead; + Rd->RefStart = RefStart; + Rd->maxLenR = shear + ov; + + #pragma omp parallel for + for (uint64_t i = 0; i < numLet; ++i) SeqDump[i] &= 15; + free(RefLen); + printf("Duplicate-guided shearing [%f]\n",omp_get_wtime()-wtime); + + //exit(190); + // Final check: defined numRrebase, all RefStart populated; origRefLen+origRefSeq and Rd pointers to them } else { @@ -1607,23 +2055,26 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t for (uint32_t j = 0; j < unit; j+= shear) ReRefIx[x] = i, Rd->RefStart[x++] = j ? j : 0; } - } + + printf("Shorn refs: %u, rebased clumps: %u\n",numRrebase, numRrebase/16 + (numRrebase%16 != 0)); - printf("Shorn refs: %u, rebased clumps: %u\n",numRrebase, numRrebase/16 + (numRrebase%16 != 0)); + // Redefine existing variables to sheared versions + Rd->totR = numRrebase; + Rd->RefLen = malloc(Rd->totR*sizeof(*Rd->RefLen)); + Rd->RefSeq = malloc(Rd->totR*sizeof(*Rd->RefSeq)); + Rd->RefHead = malloc(Rd->totR*sizeof(*Rd->RefHead)); + if (!Rd->RefLen || !Rd->RefSeq || !Rd->RefHead) + {fputs("OOM:RefLen\n",stderr); exit(3);} + Rd->maxLenR = shear + ov; // may actually be less... + for (uint32_t i = 0; i < Rd->totR; ++i) + Rd->RefHead[i] = origRefHead[ReRefIx[i]], + Rd->RefSeq[i] = origRefSeq[ReRefIx[i]] + Rd->RefStart[i], + Rd->RefLen[i] = origRefLen[ReRefIx[i]] - Rd->RefStart[i], + Rd->RefLen[i] = Rd->RefLen[i] > Rd->maxLenR ? Rd->maxLenR : Rd->RefLen[i]; + free(ReRefIx); + } - // Redefine existing variables to sheared versions - Rd->totR = numRrebase; - Rd->RefLen = malloc(Rd->totR*sizeof(*Rd->RefLen)); - Rd->RefSeq = malloc(Rd->totR*sizeof(*Rd->RefSeq)); - Rd->RefHead = malloc(Rd->totR*sizeof(*Rd->RefHead)); - if (!Rd->RefLen || !Rd->RefSeq || !Rd->RefHead) - {fputs("OOM:RefLen\n",stderr); exit(3);} - Rd->maxLenR = shear + ov; // may actually be less... - for (uint32_t i = 0; i < Rd->totR; ++i) - Rd->RefHead[i] = origRefHead[ReRefIx[i]], - Rd->RefSeq[i] = origRefSeq[ReRefIx[i]] + Rd->RefStart[i], - Rd->RefLen[i] = origRefLen[ReRefIx[i]] - Rd->RefStart[i], - Rd->RefLen[i] = Rd->RefLen[i] > Rd->maxLenR ? Rd->maxLenR : Rd->RefLen[i]; + } Rd->RefIxSrt = malloc((1+Rd->totR) * sizeof(*Rd->RefIxSrt)); @@ -1649,12 +2100,21 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t for (uint32_t i = 1; i < Rd->totR; ++i) { if (SeqLenPair[i].len > curTol + lat) { curTol = SeqLenPair[i].len; - qsort(SeqLenPair + prev_ix, i - prev_ix, sizeof(*SeqLenPair),cmpPackSeq); + //qsort(SeqLenPair + prev_ix, i - prev_ix, sizeof(*SeqLenPair),cmpPackSeq); + if (i - prev_ix > 1) { + Tuxedo *thisTux = SeqLenPair + prev_ix; + uint32_t range = i - prev_ix; + printf("Sorting on i=%u, prev = %u (range=%u)\n",i,prev_ix,range); + if (range > 256) parallel_sort_tuxedo(thisTux, range); + else qsort(thisTux, range, sizeof(*SeqLenPair),cmpPackSeq); + } prev_ix = i; } } if (prev_ix < Rd->totR-1) - qsort(SeqLenPair + prev_ix, Rd->totR - prev_ix, sizeof(*SeqLenPair),cmpPackSeq); + //qsort(SeqLenPair + prev_ix, Rd->totR - prev_ix, sizeof(*SeqLenPair),cmpPackSeq); + if (Rd->totR - prev_ix > 1) printf("Sorting final (range=%u)\n",Rd->totR - prev_ix); + parallel_sort_tuxedo(SeqLenPair + prev_ix, Rd->totR - prev_ix); for (uint32_t i = 0; i < Rd->totR; ++i) Rd->RefIxSrt[i] = SeqLenPair[i].ix; free(SeqLenPair); @@ -1667,16 +2127,23 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t if (curate) { // dedupe references (redefine RefIxSrt using RefDedupIx map) uint64_t tot_divR = 0, rdupes = 0; uint32_t uix = 0; // dupe-map, map[uniq_ix] -> orig_ix - Rd->RefDedupIx = calloc(Rd->totR+1,sizeof(*Rd->RefDedupIx)); - for (uint32_t i = 1, j; i < Rd->totR; ++i) { - for (j = 0; j < Rd->RefLen[Rd->RefIxSrt[i]]; ++j) - if (Rd->RefSeq[Rd->RefIxSrt[i]][j] != Rd->RefSeq[Rd->RefIxSrt[i-1]][j]) break; + printf("Producing edb scaffold...\n"); + uint32_t totR = Rd->totR, *RefIxSrt = Rd->RefIxSrt, *RefLen = Rd->RefLen, + *RefDedupIx = calloc(Rd->totR+1,sizeof(*Rd->RefDedupIx)); + char **RefSeq = Rd->RefSeq; + if (!RefDedupIx) {fputs("OOM:RefDedupIx\n",stderr); exit(3);} + for (uint32_t i = 1, j; i < totR; ++i) { + uint32_t curIx = RefIxSrt[i], prevIx = RefIxSrt[i-1], + maxL = MIN(RefLen[curIx],RefLen[prevIx]); + for (j = 0; j < maxL; ++j) + if (RefSeq[curIx][j] != RefSeq[prevIx][j]) break; tot_divR += j; - if (j == Rd->RefLen[Rd->RefIxSrt[i]] && j == Rd->RefLen[Rd->RefIxSrt[i-1]]) ++rdupes; - else Rd->RefDedupIx[++uix] = i; + if (j == RefLen[curIx] && j == RefLen[prevIx]) ++rdupes; + else RefDedupIx[++uix] = i; } - Rd->RefDedupIx[++uix] = Rd->totR; // add an end-cap for closure - printf("Avg. R Divergence: %f (%u dupes, %u uniq)\n",(double)tot_divR/Rd->totR,rdupes,uix); + RefDedupIx[++uix] = totR; // add an end-cap for closure + Rd->RefDedupIx = RefDedupIx; + printf("Avg. R Divergence: %f (%u dupes, %u uniq)\n",(double)tot_divR/totR,rdupes,uix); // Ensure the first db hit is maintained as the unique representative for (uint32_t i = 0; i < uix; ++i) { // not nec. if stable sort? @@ -1709,7 +2176,7 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t Prince *p = Fp.P; printf("How many fingerprints made = %u\n",Fp.nf); if (Z) { // Cluster by N-free profile if N-penalize - puts("WARNING: penalizing N's in fingerprints may slow unpenalized alignments."); + puts("Note: penalizing N's in fingerprints may slow unpenalized alignments."); Prince t; // New: swap to error-free fingerprints anyway (higher quality clusters for N-case and without) for (uint32_t i = 0; i < Rd->totR; ++i) t = p[i], p[i] = p[Fp.Ptrs[i]], p[Fp.Ptrs[i]] = t; // swap FPs } @@ -1733,7 +2200,7 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t // The direct fingerprint way double wtime = omp_get_wtime(); - #define BANDED_FP 1000000 + #define BANDED_FP 1048576 // Heuristic: sort FPs by first few bits (Prince.h), then iterate over bounded range if (BANDED_FP) { uint32_t *WordRange = malloc(totR*sizeof(*WordRange)); @@ -1837,13 +2304,85 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t for (uint32_t i = 0; i < totR; ++i) P[i] = p[i]; - uint32_t RI[THREADS]; + uint32_t RI[THREADS]; for (int z = 0; z < THREADS; ++z) RI[z] = -1; //void *PC_init = 0; Prince *PC = calloc_a(64,(tot16 >> 4)*sizeof(*PC),&PC_init); Prince *PC = calloc(tot16 >> 4,sizeof(*PC)); if (!PC) {fputs("OOM:PC_init\n",stderr); exit(3);} // naive greedy clusterer { double wtime = omp_get_wtime(); + #ifdef CUBICLUST + // cubic zirconiu(m;^3) + uint32_t *T_Pack[THREADS], *T_Ppk[THREADS], + hshSz = (MIN(totR,BANDED_FP) >> 3) + 7; + uint8_t *T_Hash[THREADS]; + #pragma omp parallel + T_Hash[omp_get_thread_num()] = calloc(hshSz,1), + T_Pack[omp_get_thread_num()] = calloc(16,sizeof(uint32_t)), + T_Ppk[omp_get_thread_num()] = calloc(16,sizeof(uint32_t)); + + for (uint32_t i = 0; i < totR; i+=16) { + uint32_t gmin = -1, gdst = -1, + range1 = MIN(totR,i+THREADS), range2 = MIN(totR, i+BANDED_FP); + #pragma omp parallel + { + int tid = omp_get_thread_num(); + uint8_t *Hash = T_Hash[tid]; + uint32_t *Pack = T_Pack[tid], *Ppk = T_Ppk[tid]; + uint32_t pmin = -1, pdst = -1, *t; + + #pragma omp for reduction(min:gmin) + for (uint32_t j = i; j < range1; ++j) { + Pack[0] = j, Hash[(j-i)>>3] = 1 << ((j-i) & 7); + Prince centroid = P[j]; + uint32_t min = -1, dst = -1, totD = 0; + for (int z = 1; z < 16; ++z) { + min = -1, dst = -1; + for (uint32_t k = i; k < range2; ++k) { + uint32_t m = k-i; + if (Hash[m>>3] && Hash[m>>3] & (1 << (m&7))) continue; + uint32_t t = FP_union(centroid,P[k]), td; + if (t < min) min = t, Pack[z] = k, dst = FP_dist(centroid,P[k]); + else if (t == min && (td=FP_dist(centroid,P[k])) < dst) + Pack[z] = k, dst = td; + } + if (min != -1) centroid = FP_combine(centroid,P[Pack[z]]), totD += dst, + Hash[(Pack[z]-i)>>3] |= 1 << ((Pack[z]-i) & 7); + else Pack[z] = i; + } + for (int z = 0; z < 16; ++z) Hash[(Pack[z]-i)>>3] = 0; + + if (min < gmin) + gmin = pmin = min, pdst = totD, + t = Ppk, Ppk = Pack, Pack = t; + else if (min == gmin && totD < pdst) + pdst = totD, t = Ppk, Ppk = Pack, Pack = t; + } + T_Pack[tid] = Pack, T_Ppk[tid] = Ppk; + RI[tid] = pmin == gmin ? pdst : -1; + } + uint32_t whichZ = -1; + for (int z = 0; z < THREADS; ++z) if (RI[z] != -1) { //if (RI[z]) { + //if (whichZ == -1 || *T_Ppk[z] < *T_Ppk[whichZ]) whichZ = z; + if (RI[z] < gdst) gdst = RI[z], whichZ = z; + else if (RI[z] == gdst && (*T_Ppk[z] < *T_Ppk[whichZ])) whichZ = z; + } + uint32_t *Score = T_Ppk[whichZ]; + for (int z = 0; z < 16; ++z) { + if (i+z >= totR) break; + Prince tp = P[i+z]; P[i+z] = P[Score[z]]; P[Score[z]] = tp; + uint32_t x = IxArray[i+z]; IxArray[i+z] = IxArray[Score[z]]; + IxArray[Score[z]] = x; + } + if (i & 255) printf("\rClustered %u...\r",i); + } + for (uint32_t i = 0; i < tot16; i+=16) { + Prince charming = P[i]; + for (uint32_t j = 1; j < 16; ++j) + charming = FP_combine(charming,P[i+j]); + PC[i >> 4] = charming; + } + #else Prince centroid = *P; for (uint32_t j = 1; j < totR; ++j) { uint32_t min = -1, mix = -1, dst = -1; @@ -1851,7 +2390,7 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t { uint32_t tix = -1, tmin = -1, tdst = -1; #pragma omp for reduction(min:min) - for (uint32_t k = j; k < MIN(totR,j+BANDED_FP); ++k) { // TODO: replace totR with MIN(totR, j+BANDED_FP) + for (uint32_t k = j; k < MIN(totR,j+BANDED_FP); ++k) { uint32_t t = FP_union(centroid,P[k]), td; if (t < min) min = tmin = t, tix = k, tdst = FP_dist(centroid,P[k]); else if (t == min && (td=FP_dist(centroid,P[k])) < tdst) @@ -1868,17 +2407,19 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t uint32_t x = IxArray[j]; IxArray[j] = IxArray[mix]; IxArray[mix] = x; if (!((j+1) & 15)) PC[j >> 4] = centroid, centroid = P[j+1]; if (j & 255) printf("\rClustered %u...\r",j); + if (totR < tot16) PC[totR >> 4] = centroid; } - if (totR < tot16) PC[totR >> 4] = centroid; + #endif printf("\nDone with greedy (%f)\n", omp_get_wtime() - wtime); totalPop = 0; for (uint32_t i = 0; i < tot16 >> 4; ++i) totalPop += FP_pop(PC+i); - printf("Starting at %llu [%f] for final round.\n",totalPop,(double)(totalPop << 4)/totR); - + printf("Ended at %llu [%f] density.\n",totalPop,(double)(totalPop << 4)/totR); + #pragma omp parallel for for (uint32_t i = 0; i < tot16; ++i) P[i] = IxArray[i] >= totR ? (Prince){0} : p[IxArray[i]]; uint32_t *ClusFPs = malloc((tot16 >> 4)*sizeof(*ClusFPs)); + #pragma omp parallel for for (uint32_t i = 0; i < tot16; i+=16) { Prince charming = P[i]; for (uint32_t j = 1; j < 16; ++j) @@ -1896,169 +2437,14 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t printf("Individual {DEBUG} score = %llu [%f]\n",totalPop,(double)totalPop/totR); } - //exit(1885); - // end of naive greedy clusterer - /* - // Main cluster loop + // end of greedy clusterer - typedef struct { - Prince h1, h2; - uint32_t pop; //, pt; - } Clus; - Clus *CX = malloc(tot16/2*sizeof(*CX)); - for (uint32_t i = 1; i <= 4; ++i) { - uint64_t tally = 0; - uint32_t max = tot16 >> (i-1); - if (i < 1) { - tally = 0; - double wtime = omp_get_wtime(); - for (uint32_t j = 0; j < max; j+=2) { - // our center will be j, and we will swap j+1 with whatever it matches best - uint32_t min = -1, mix = -1, dst = -1; - #pragma omp parallel - { - uint32_t tix = -1, tmin = -1, tdst = -1; - #pragma omp for reduction(min:min) - for (uint32_t k = j+1; k < max; ++k) { - uint32_t t = FP_union(P[j],P[k]), td; - if (t < min) min = tmin = t, tix = k, tdst = FP_dist(P[j],P[k]); - else if (t == min && (td=FP_dist(P[j],P[k])) < tdst) - tix = k, tdst = td; - } - RI[omp_get_thread_num()] = tmin == min ? tix : -1; - } - uint32_t t; - //#pragma omp simd - for (uint32_t k = 0; k < THREADS; ++k) - if (RI[k] != -1 && ((t=FP_dist(P[j],P[RI[k]])) < dst || (t==dst && RI[k] < mix))) mix = RI[k]; - //mix = RI[k] < mix ? RI[k] : mix; - Prince tp = P[j+1]; P[j+1] = P[mix]; P[mix] = tp; - //uint32_t x = IxArray[j+1]; IxArray[j+1] = IxArray[mix]; IxArray[mix] = x; - // vecswap - uint32_t adj = (j+1) << (i-1), amp = 1 << (i-1), tmp; - mix <<= (i-1); - for (uint32_t z = 0; z < amp; ++z) - tmp = IxArray[adj + z], - IxArray[adj + z] = IxArray[mix + z], - IxArray[mix + z] = tmp; - tally += min; // we want to report this in the end - } - printf("Total with greedy = %llu [%f] (%f)\n",tally, (double)(tally << i)/totR, - omp_get_wtime() - wtime); - } - max = tot16 >> i; - for (uint32_t j = 0; j < max; ++j) { - CX[j].h1 = P[j*2], CX[j].h2 = P[j*2+1]; - CX[j].pop = FP_union(CX[j].h1, CX[j].h2); - //CX[j].pt = j << i; // point to cluster's first ix in IxArray - ShfIx[j] = j; - } - - // monte carlo loop - uint32_t pass = 1, swaps = 0, freshswap = 0, dry = 0; - wtime = omp_get_wtime(); - do { - swaps = 0; - #pragma omp parallel for reduction(+:swaps) - for (uint32_t j = 0; j < max; j+=2) { - //swap ShfIx [j], [j+1] - uint32_t i1 = ShfIx[j], i2 = ShfIx[j+1]; - uint32_t t1 = FP_union(CX[i1].h1, CX[i2].h2), - t2 = FP_union(CX[i2].h1, CX[i1].h2); - if (t1 + t2 < CX[i1].pop + CX[i2].pop) { - ++swaps; - CX[i1].pop = t1, CX[i2].pop = t2; - Prince tfp = CX[i1].h2; // swap cache - CX[i1].h2 = CX[i2].h2; // fuse 2 into 1 - CX[i2].h2 = tfp; // fuse (old) 1 into 2 - uint32_t tmp; - i1 <<= i; i2 <<= i; - for (uint32_t z = 1 << (i-1); z < 1 << i; ++z) - tmp = IxArray[i1 + z], - IxArray[i1 + z] = IxArray[i2 + z], - IxArray[i2 + z] = tmp; - } - } - //if (!(pass & 255)) - // printf("\rIter %u, Pass %u, swapped: %llu\r",i,pass,swaps); - - // Slide to allow more swaps - #pragma omp parallel for simd - for (uint32_t z = 0; z < max; ++z) - ShfIx[z] = ShfIx[z] + 1 >= max ? ShfIx[z] + 1 - max : ShfIx[z] + 1; - #define PRAD 0 - if (!swaps) { - if (freshswap && ++dry > 100000 << (i-1)) break; // no change since last time - if (doRand) { - #pragma omp parallel - { - uint64_t seed = omp_get_thread_num() + 1 + mseed; - #pragma omp for - for (uint32_t z = 0; z < max-1; ++z) { - //Cache[z] = QRand64(&seed) % (max - z) + z; //> 200000 ? 200000 : max-z)) + z; - uint32_t r = QRand64(&seed); - r = r % (max-z) + z; //(uint64_t)r * (max - z) / (uint32_t)-1 + z; // / (max - z); - Cache[z] = r; - //uint32_t old; - //old = ShfIx[z]; - //#pragma omp atomic capture - //{old = ShfIx[z]; ShfIx[z] = ShfIx[r];} - //#pragma omp atomic capture - //{ShfIx[z] = ShfIx[r]; ShfIx[r] = old;} - } - #pragma omp single - mseed = seed; - } - for (uint32_t z = 0; z < max - 1; ++z) { - register uint32_t t = ShfIx[z], r = Cache[z]; - ShfIx[z] = ShfIx[r], ShfIx[r] = t; - } - } else { - for (uint32_t z = 0; z < max; ++z) - Cache[z]=ShfIx[2*z >= max ? 2*z - max + 1 : 2*z]; - uint32_t *t = Cache; Cache = ShfIx; ShfIx = t; - } - freshswap = 1; - } else freshswap = 0; - } while (pass++ < 100000 << (i-1)); - tally = 0; - #pragma omp parallel for reduction(+:tally) - for (uint32_t i = 0; i < max; ++i) tally += CX[i].pop; - //printf("Total pop (iter %u, pass %u): %llu [av %f]\n",i, pass, totalPop,(double)totalPop/(tot16 >> i)); - printf("[Iter %u; pass %u] Total = %llu [%f] (%f)\n",i, pass, tally, (double)(tally << i)/totR, - omp_get_wtime() - wtime); - - // prepare next loop vars - if (i == 4) break; - - max = tot16 >> (i+1); - for (uint32_t j = 0; j < max; ++j) { - CX[j].h1 = FP_combine(CX[j*2].h1,CX[j*2].h2); - CX[j].h2 = FP_combine(CX[j*2+1].h1,CX[j*2+1].h2); - P[j*2] = CX[j].h1; - P[j*2+1] = CX[j].h2; - CX[j].pop = FP_union(CX[j].h1, CX[j].h2); - ShfIx[j] = j; - } - //break; - } - free(CX); CX = 0; - totalPop = 0; - #pragma omp parallel for reduction(+:totalPop) - for (uint32_t i = 0; i < tot16; ++i) totalPop += IxArray[i] >= totR ? 0 : FP_pop(p+IxArray[i]); - printf("Total pop (completion): %llu [av %f]\n",totalPop,(double)totalPop/totR); - */ - - // Assuming clusters are placed relatively neatly in order, - // Pick a random element (from any cluster) and swap with a random element (from any other cluster) - // If combined score of the two clusters increases, keep swap. Repeat until convergence. - - //for (uint32_t i = 0; i < tot16; ++i) P[i] = IxArray[i] >= totR ? (Prince){0} : p[IxArray[i]]; - if (sig_thres) { + if (sig_thres) { // Cluster refinement loop uint32_t *ClusFPs = malloc((tot16 >> 4)*sizeof(*ClusFPs)); - + #pragma omp parallel for for (uint32_t i = 0; i < tot16; ++i) P[i] = IxArray[i] >= totR ? (Prince){0} : p[IxArray[i]]; + #pragma omp parallel for for (uint32_t i = 0; i < tot16; i+=16) { Prince charming = P[i]; for (uint32_t j = 1; j < 16; ++j) @@ -2300,7 +2686,11 @@ static inline void process_references(char *ref_FN, Reference_Data *Rd, uint32_t if (curate < 2) create_sse2_profiles(Rd); #endif if (DO_ACCEL) return; - if (!DO_FP) for (uint32_t i = 0; i < origR; ++i) free(origRefSeq[i]); + if (!DO_FP) { + if (Rd->dbType == QUICK) + for (uint32_t i = 0; i < origR; ++i) free(origRefSeq[i]); + else free(SeqDump); + } if (REBASE) free(origRefLen), free(origRefSeq), free(origRefHead); } @@ -3008,7 +3398,7 @@ void make_accelerator(Reference_Data *Rd, char *xcel_FN) { char **RefSeq = Rd->RefSeq; uint32_t *RefIxSrt = Rd->RefIxSrt, *RefLen = Rd->RefLen, totR = Rd->totR, totRC = Rd->numRclumps; - if (Z) fprintf(stderr,"WARNING: N-penalized accelerator not usable for unpenalized alignment\n"); + if (Z) fprintf(stderr,"Note: N-penalized accelerator not usable for unpenalized alignment\n"); //THREADS = THREADS >> 1 ?: 1; //1; //THREADS > 4 ? 4 : THREADS; uint32_t *Bad = 0; @@ -3082,7 +3472,7 @@ void make_accelerator(Reference_Data *Rd, char *xcel_FN) { } #pragma omp single { // create data structures of the perfect size for bads and words - printf("Finished scanning pass [%f].\n",omp_get_wtime() - wtime); + printf("Finished scanning pass [%f]. \n",omp_get_wtime() - wtime); for (uint32_t i = 0; i < 1 << (2*SCOUR_N); ++i) if (F[i].cap) { F[i].Refs = malloc(F[i].cap*sizeof(*F[i].Refs)); if (!F[i].Refs) {fputs("OOM:Accelerant.Refs\n",stderr); exit(3);} @@ -3475,6 +3865,14 @@ static inline void do_alignments(FILE *output, Reference_Data RefDat, Query_Data //uint32_t *UniqQLen, *UniqQed, *UniqDiv, *NewIX, maxDiv, *RCMap, *Umap; //char **UniqQSeq; + if (RUNMODE == ANY) { // preempt TmpRIX creation for inline printing + if (!RefIxSrt && RefDedupIx) { + RefIxSrt = malloc(totR * sizeof(*RefIxSrt)); + if (!RefIxSrt) {fputs("OOM:[DA]RefIxSrt\n",stderr); exit(3);} + for (uint32_t i = 0; i < totR; ++i) RefIxSrt[i] = TmpRIX[RefDedupIx[i]]; + } + else if (!RefIxSrt && !RefDedupIx) RefIxSrt = TmpRIX; + } if (DO_ACCEL) { uint32_t QBUNCH = newUniqQ / (THREADS*128); @@ -3564,14 +3962,32 @@ if (DO_ACCEL) { for (uint32_t j = 0; j < oldNref; ++j) Refs[j].i = 0; //Bucket[j] = 0; // Bucket memset uint32_t bound = MIN(z+QBUNCH,QBins[1]), nref = 0, min_mmatch = UINT32_MAX; uint32_t wix = 0, cix = 0; + UniBins[z].div = 1; // Instead of inner loop; because it trails nothing before it for (uint32_t j = z; j < bound; ++j) { UniBin Ub = UniBins[j]; ShrBin Sb = ShrBins[Ub.six]; uint32_t len = Sb.len, err = Sb.ed; + if (err == -1) continue; char *s = Ub.Seq; uint32_t mmatch = len - SCOUR_N * err - SCOUR_N; // len - (err + 1) * SCOUR_N; if (mmatch < min_mmatch) min_mmatch = mmatch; // bank worst-case threshold //printf("Query %u: len = %u, err = %u, runsize = %u, mmatch = %u\n", j, len, err, (len - err)/(err+1), mmatch); - if (j >= *QBins) {//storeUnambigWords(Word_Ix, s, len, &wix, j); + //if (j == z) Ub.div = 1; // remove in loop below? + uint32_t initk = 0; + //if (Ub.div > SCOUR_N) initk = Ub.div - SCOUR_N; + //initk -= initk != 0; + if (j >= *QBins) { //storeUnambigWords(Word_Ix, s, len, &wix, j); + uint32_t k = initk, w = s[k++] - 1; + for (; k + 1 < SCOUR_N + initk; ++k) + w <<= 2, w |= s[k]-1; + for (; k < len; ++k) { + w <<= 2, w |= s[k]-1; + uint32_t t = (w << SCOUR_R) >> SCOUR_R; + Word_Ix[wix++] = (Split){t,j}; + } + } + else for (uint32_t k = initk; k + SCOUR_N <= len; ++k) + storeAmbigWords(Word_Ix, s + k, &wix, j, 0, 0); + /* if (j >= *QBins) {//storeUnambigWords(Word_Ix, s, len, &wix, j); uint32_t w = *s - 1; for (uint32_t k = 1; k + 1 < SCOUR_N; ++k) w <<= 2, w |= s[k]-1; @@ -3582,7 +3998,7 @@ if (DO_ACCEL) { } } else for (uint32_t k = 0; k + SCOUR_N <= len; ++k) - storeAmbigWords(Word_Ix, s + k, &wix, j, 0, 0); + storeAmbigWords(Word_Ix, s + k, &wix, j, 0, 0); */ /* if (j >= *QBins) setUnambigScour(Hash, Refs, &nref, mmatch, Forest, s, qdim, len, Cache, &cix); else for (uint32_t k = 0; k + SCOUR_N <= len; ++k) @@ -3655,7 +4071,7 @@ if (DO_ACCEL) { uint32_t stack_ix = 0; //thisMax = 0; *StackX = z, *StackE = -1; // initialize stack (worst case) - UniBins[z].div = 1; // force first divergence to 1 + //UniBins[z].div = 1; // force first divergence to 1 (done above) int8_t fp_rediv = 0; for (uint32_t j = z; j < bound; ++j) { @@ -3666,7 +4082,7 @@ if (DO_ACCEL) { // handle early k-term here uint32_t mmatch = len - Emac * SCOUR_N - SCOUR_N; - if (!x && Refs[i].i <= mmatch) { + if (Emac == -1 || (!x && Refs[i].i <= mmatch)) { // Sb->len set to -1 in ANY mode to mark spent query fp_rediv = 1; // puts("-->Skipping (mmatch)"); continue; } @@ -3728,7 +4144,7 @@ if (DO_ACCEL) { //printf("--> min = %u\n",min); if (min <= Sb->ed) { // now we get serious. uint32_t ai = Ub->six + (Ub->rc ? numUniqQ : 0); - if (RUNMODE != FORAGE) { + if (RUNMODE != FORAGE && RUNMODE != ANY) { /*if (min < Sb->ed)*/ Sb->ed = min; ResultPod *prv; if (Pods[ai] && min < Pods[ai]->mismatches) // purge @@ -3740,15 +4156,51 @@ if (DO_ACCEL) { for (int z = 0; z < VECSZ; ++z) { if (mins.u8[z] > min) continue; // skip non-mins ResultPod *tmp = malloc(sizeof(*tmp)); - tmp->next = Pods[ai]; Pods[ai] = tmp; - - Pods[ai]->mismatches = mins.u8[z]; - Pods[ai]->score = MPK.score[z]; //-1.f; // placeholder - Pods[ai]->refIx = ri * VECSZ + z; - Pods[ai]->finalPos = MPK.finalPos[z]; - Pods[ai]->numGapR = MPK.numGapR[z]; - Pods[ai]->numGapQ = MPK.numGapQ[z]; - Pods[ai]->rc = Ub->rc; + tmp->next = Pods[ai]; + tmp->mismatches = mins.u8[z]; + tmp->score = MPK.score[z]; //-1.f; // placeholder + tmp->refIx = ri * VECSZ + z; + tmp->finalPos = MPK.finalPos[z]; + tmp->numGapR = MPK.numGapR[z]; + tmp->numGapQ = MPK.numGapQ[z]; + tmp->rc = Ub->rc; + if (RUNMODE == ANY) { + ResultPod *rp = tmp; + uint32_t rix = RefIxSrt[rp->refIx], + numGap = rp->numGapR + rp->numGapQ, + numMis = rp->mismatches - numGap, + qlen = Sb->len, + alLen = qlen + numGap, + mOff = RefStart ? RefStart[rix] : 0, tmp, + stIxR = rp->finalPos - qlen + rp->numGapR + mOff, + edIxR = rp->finalPos + mOff; + if (rp->rc) tmp = stIxR, stIxR = edIxR, edIxR = tmp; + /* if (taxa_parsed) { + char *tt = taxa_lookup(RefHead[rix],taxa_parsed-1,Taxonomy); + if (taxasuppress) { + uint32_t lm, s = 0; + strcpy(Taxon, tt); + for (lm = 0; TAXLEVELS[lm] < rp->score; ++lm); + if (!lm) FinalTaxon = NULLTAX; + else for (int x = 0; Taxon[x]; ++x) { + if (Taxon[x]==';' && ++s == lm) { + Taxon[x] = 0; break; + } + FinalTaxon = Taxon; + } + } else FinalTaxon = tt; // i + for (uint32_t j = Offset[Ub->six]; j < Offset[Ub->six+1]; ++j) PRINT_MATCH_TAX() + } + else */ + if (Sb->ed != -1) for (uint32_t j = Offset[Ub->six]; j < Offset[Ub->six+1]; ++j) + fprintf(output,"%s\t%s\t%f\t%u\t%u\t%u\t%u\t%u\t%d\t%u\t%u\t%u\n", + QHead[j], RefHead[rix], rp->score * 100, + alLen, numMis, numGap, 1, qlen, stIxR, edIxR, + rp->mismatches,j > Offset[Ub->six]); + Sb->ed = -1; + break; + } + Pods[ai] = tmp; } } } @@ -3882,7 +4334,10 @@ if (DO_ACCEL) { //printf("[%u,qi=%u] UniqDiv = %u, err = %u\n",j,qi,UniqDiv[qi], UniqQed[qi]); uint32_t Emac = Sb->ed; //UniqQed[ai]; - + if (Emac == -1) { + fp_rediv = 1; + continue; + } // handle fingerprinting here if (DO_FP) { if (Fp.N[qi] - FP_intersect(Centroids[ri],Fp.P[qi]) > Emac) { @@ -3897,21 +4352,20 @@ if (DO_ACCEL) { if (t > trigger) trigger = t; } if (Fp.N[qi] - trigger > Emac) {fp_rediv = 1; continue;} - - if (fp_rediv) { - fp_rediv = 0; - if (Emac <= StackE[stack_ix]) { - thisDiv = 1; if (stack_ix) { - uint32_t sai = StackX[stack_ix], lenD; - //sai = sai >= numUniqQ ? RCMap[sai-numUniqQ] : sai; - lenD = MIN(len,ShrBins[UniBins[sai].six].len) - 1; - register char *thisQ = Ub->Seq, *prevQ = UniBins[sai].Seq; //UniqQSeq[StackX[stack_ix]]; - for (uint32_t w = 0; w < lenD && thisDiv < maxDiv && - thisQ[w] == prevQ[w]; ++w) ++thisDiv; - } + } + if (fp_rediv) { + fp_rediv = 0; + if (Emac <= StackE[stack_ix]) { + thisDiv = 1; if (stack_ix) { + uint32_t sai = StackX[stack_ix], lenD; + //sai = sai >= numUniqQ ? RCMap[sai-numUniqQ] : sai; + lenD = MIN(len,ShrBins[UniBins[sai].six].len) - 1; + register char *thisQ = Ub->Seq, *prevQ = UniBins[sai].Seq; //UniqQSeq[StackX[stack_ix]]; + for (uint32_t w = 0; w < lenD && thisDiv < maxDiv && + thisQ[w] == prevQ[w]; ++w) ++thisDiv; } - //Emac = StackE[stack_ix ? stack_ix - 1 : stack_ix]; } + //Emac = StackE[stack_ix ? stack_ix - 1 : stack_ix]; } // if error tol exceed cur stack, increment stack, set this ptr @@ -3945,7 +4399,7 @@ if (DO_ACCEL) { if (min <= Sb->ed) { // now we get serious. //uint32_t ai = Ub->six; - if (RUNMODE != FORAGE) { + if (RUNMODE != FORAGE && RUNMODE != ANY) { if (min < Sb->ed) Sb->ed = min; ResultPod *prv; if (Pods[ai] && min < Pods[ai]->mismatches) // purge @@ -3959,15 +4413,52 @@ if (DO_ACCEL) { for (int z = 0; z < VECSZ; ++z) { if (mins.u8[z] > min) continue; // skip non-mins ResultPod *tmp = malloc(sizeof(*tmp)); - tmp->next = Pods[ai]; Pods[ai] = tmp; + tmp->next = Pods[ai]; - Pods[ai]->mismatches = mins.u8[z]; - Pods[ai]->score = MPK.score[z]; //-1.f; // placeholder - Pods[ai]->refIx = ri * VECSZ + z; - Pods[ai]->finalPos = MPK.finalPos[z]; - Pods[ai]->numGapR = MPK.numGapR[z]; - Pods[ai]->numGapQ = MPK.numGapQ[z]; - Pods[ai]->rc = Ub->rc; + tmp->mismatches = mins.u8[z]; + tmp->score = MPK.score[z]; //-1.f; // placeholder + tmp->refIx = ri * VECSZ + z; + tmp->finalPos = MPK.finalPos[z]; + tmp->numGapR = MPK.numGapR[z]; + tmp->numGapQ = MPK.numGapQ[z]; + tmp->rc = Ub->rc; + if (RUNMODE == ANY) { + ResultPod *rp = tmp; + uint32_t rix = RefIxSrt[rp->refIx], + numGap = rp->numGapR + rp->numGapQ, + numMis = rp->mismatches - numGap, + qlen = Sb->len, + alLen = qlen + numGap, + mOff = RefStart ? RefStart[rix] : 0, tmp, + stIxR = rp->finalPos - qlen + rp->numGapR + mOff, + edIxR = rp->finalPos + mOff; + if (rp->rc) tmp = stIxR, stIxR = edIxR, edIxR = tmp; + /* if (taxa_parsed) { + char *tt = taxa_lookup(RefHead[rix],taxa_parsed-1,Taxonomy); + if (taxasuppress) { + uint32_t lm, s = 0; + strcpy(Taxon, tt); + for (lm = 0; TAXLEVELS[lm] < rp->score; ++lm); + if (!lm) FinalTaxon = NULLTAX; + else for (int x = 0; Taxon[x]; ++x) { + if (Taxon[x]==';' && ++s == lm) { + Taxon[x] = 0; break; + } + FinalTaxon = Taxon; + } + } else FinalTaxon = tt; // i + for (uint32_t j = Offset[Ub->six]; j < Offset[Ub->six+1]; ++j) PRINT_MATCH_TAX() + } + else */ + if (Sb->ed != -1) for (uint32_t j = Offset[Ub->six]; j < Offset[Ub->six+1]; ++j) + fprintf(output,"%s\t%s\t%f\t%u\t%u\t%u\t%u\t%u\t%d\t%u\t%u\t%u\n", + QHead[j], RefHead[rix], rp->score * 100, + alLen, numMis, numGap, 1, qlen, stIxR, edIxR, + rp->mismatches,j > Offset[Ub->six]); + Sb->ed = -1; + break; + } + Pods[ai] = tmp; } } /*if (min <= UniqQed[ai]) { // now we get serious. @@ -4047,8 +4538,10 @@ if (DO_ACCEL) { } printf("\rSearch Progress: [100.00%%]\n"); EOA:NULL; + + if (RUNMODE == ANY) return; + printf("Search complete. Consolidating results...\n"); - //if (totSkipped) printf("NumSkipped = %llu (%f)\n", // totSkipped,(double)totSkipped/((double)numUniqQ*numRclumps)); if (!RefIxSrt && RefDedupIx) { @@ -4057,6 +4550,12 @@ if (DO_ACCEL) { for (uint32_t i = 0; i < totR; ++i) RefIxSrt[i] = TmpRIX[RefDedupIx[i]]; } else if (!RefIxSrt && !RefDedupIx) RefIxSrt = TmpRIX; + + uint32_t maxIX = 0; + #pragma omp simd reduction(max:maxIX) + for (uint32_t i = 0; i < totR; ++i) + maxIX = RefIxSrt[i] > maxIX ? RefIxSrt[i] : maxIX; + printf("Max IX for allocations = %u [totR = %u, origTotR = %u]\n",maxIX,totR,RefDat.origTotR); free(Centroids); free(FpR.initP); free(ProfClump); free(RefClump); free(SeqDumpRC); // free a part of the query memory @@ -4065,6 +4564,7 @@ if (DO_ACCEL) { uint32_t *RefMap = RefDat.RefMap; if (!RefMap) { RefMap = malloc(totR*sizeof(*RefMap)); + if (!RefMap) {fputs("OOM:RefMap\n",stderr); exit(3);} RefDat.numRefHeads = RefDat.totR; for (uint64_t i = 0; i < totR; ++i) RefMap[i] = i; } @@ -4225,8 +4725,12 @@ if (DO_ACCEL) { } } else if (RUNMODE==CAPITALIST) { - uint32_t numBins = totR; //RefDat.numRefHeads; // only unique headers get separate entries + uint32_t numBins = maxIX+1; + + //totR; //RefDat.numRefHeads; // only unique headers get separate entries size_t *RefCounts = calloc(numBins,sizeof(*RefCounts)), tot = 0; + if (!RefCounts) {fputs("OOM:RefCounts\n",stderr); exit(3);} + for (uint32_t i = 0; i < numUniqQ; ++i) { ResultPod *rp = BasePod[i], *best = rp; if (!rp) continue; @@ -4241,6 +4745,9 @@ if (DO_ACCEL) { // for (uint32_t k = RefDedupIx[rp->refIx]; k < RefDedupIx[rp->refIx+1]; ++k) // ++RefCounts[RefMap[TmpRIX[k]]], ++tot; //else + //if (rp->refIx >= totR) {printf("ERROR refIX %u/%u\n",rp->refIx,totR); exit(5);} + //if (RefIxSrt[rp->refIx] >= totR) {printf("ERROR RefIxSrt[rp->refIx] %u/%u\n",RefIxSrt[rp->refIx],totR); continue;} + //if (RefMap[RefIxSrt[rp->refIx]] >= totR) {printf("NOTE: RefMap[RefIxSrt[rp->refIx]] %u/%u\n",RefMap[RefIxSrt[rp->refIx]],totR); } ++RefCounts[RefMap[RefIxSrt[rp->refIx]]], ++tot; } rp = rp->next; @@ -4437,6 +4944,7 @@ int main( int argc, char *argv[] ) { int makedb = 0, doDedupe = 0; // clustradius = 0, incl_whitespace = 0; char *ref_FN = 0, *query_FN = 0, *output_FN = 0, *xcel_FN = 0, *tax_FN = 0; DBType dbType = QUICK; + printf("This is BURST ["VER"]\n"); if (argc < 2) PRINT_USAGE() for (int i = 1; i < argc; ++i) { if (!strcmp(argv[i],"--references") || !strcmp(argv[i],"-r")) { @@ -4487,6 +4995,7 @@ int main( int argc, char *argv[] ) { if (!strcmp(argv[i],"BEST")) RUNMODE = BEST; else if (!strcmp(argv[i],"ALLPATHS")) RUNMODE = ALLPATHS; else if (!strcmp(argv[i],"CAPITALIST")) RUNMODE = CAPITALIST; + else if (!strcmp(argv[i],"ANY")) RUNMODE = ANY; else if (!strcmp(argv[i],"MATRIX")) {fputs("ERROR: Matrix mode is no longer supported\n",stderr); exit(1);} else if (!strcmp(argv[i],"FORAGE")) RUNMODE = FORAGE; @@ -4510,8 +5019,15 @@ int main( int argc, char *argv[] ) { } printf(" --> Creating %s database (assuming max query length %ld)\n",dbsel, DB_QLEN); } + else if (!strcmp(argv[i],"--dbpartition") || !strcmp(argv[i],"-dp")) { + if (++i == argc || argv[i][0] == '-') + { puts("ERROR: --dbpartition requires integer argument."); exit(1);} + int temp = atoi(argv[i]); + if (temp < 0) {fputs("ERROR: numb partitions must be >= 1\n",stderr); exit(1);} + RefDat.cparts = temp; + printf(" --> Partitioning database into %d slices\n",temp); + } else if (!strcmp(argv[i],"--accelerator") || !strcmp(argv[i],"-a")) { - //puts("ERROR: Accelerator not currently implemented [target 0.99.2]"); exit(1); if (++i == argc || argv[i][0] == '-') { puts("ERROR: --accelerator requires filename argument."); exit(1);} xcel_FN = argv[i];