diff --git a/NEWS b/NEWS index 6139e83..24e2655 100644 --- a/NEWS +++ b/NEWS @@ -3,7 +3,7 @@ CHANGES IN VERSION 1.37.2 UTILITIES - o faster `snpgdsLDpruning()` + o faster `snpgdsLDpruning()` and new multi-threading implementation CHANGES IN VERSION 1.36.1 diff --git a/R/LD.R b/R/LD.R index aa346b2..5bffe79 100755 --- a/R/LD.R +++ b/R/LD.R @@ -6,7 +6,7 @@ # A High-performance Computing Toolset for Relatedness and # Principal Component Analysis of SNP Data # -# Copyright (C) 2011 - 2020 Xiuwen Zheng +# Copyright (C) 2011 - 2024 Xiuwen Zheng # License: GPL-3 # @@ -117,11 +117,6 @@ snpgdsLDpruning <- function(gdsobj, sample.id=NULL, snp.id=NULL, stopifnot(is.numeric(ld.threshold), is.finite(ld.threshold), length(ld.threshold)==1L) stopifnot(is.numeric(num.thread), num.thread > 0L) - if (num.thread > 1L) - { - warning("The current version of 'snpgdsLDpruning()' ", - "does not support multi-threading.", call.=FALSE, immediate.=TRUE) - } stopifnot(is.logical(verbose), length(verbose)==1L) start.pos <- match.arg(start.pos) @@ -199,7 +194,8 @@ snpgdsLDpruning <- function(gdsobj, sample.id=NULL, snp.id=NULL, first = 1L, last = n.tmp) rv <- .Call(gnrLDpruning, startidx, position[flag], - slide.max.bp, slide.max.n, ld.threshold, method, verbose) + slide.max.bp, slide.max.n, ld.threshold, method, + num.thread, verbose) # output L <- rep(FALSE, length(total.snp.ids)) diff --git a/src/SNPRelate.cpp b/src/SNPRelate.cpp index c1de866..d03e960 100755 --- a/src/SNPRelate.cpp +++ b/src/SNPRelate.cpp @@ -1109,7 +1109,7 @@ COREARRAY_DLL_EXPORT void R_init_SNPRelate(DllInfo *info) extern SEXP gnrIndInbCoef(SEXP, SEXP, SEXP); extern SEXP gnrLDMat(SEXP, SEXP, SEXP, SEXP, SEXP); extern SEXP gnrLDpair(SEXP, SEXP, SEXP); - extern SEXP gnrLDpruning(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); + extern SEXP gnrLDpruning(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); extern SEXP gnrSlidingNumWin(SEXP, SEXP, SEXP, SEXP); extern SEXP gnrSlidingWindow(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); extern SEXP gnrPairScore(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); @@ -1154,7 +1154,7 @@ COREARRAY_DLL_EXPORT void R_init_SNPRelate(DllInfo *info) CALL(gnrIBSAve, 3), CALL(gnrIBSNum, 2), CALL(gnrIndInb, 5), CALL(gnrIndInbCoef, 3), CALL(gnrSSEFlag, 0), CALL(gnrLDMat, 5), - CALL(gnrLDpair, 3), CALL(gnrLDpruning, 6), + CALL(gnrLDpair, 3), CALL(gnrLDpruning, 8), CALL(gnrPairScore, 7), CALL(gnrPairIBD, 8), CALL(gnrPairIBDLogLik, 5), CALL(gnrPCA, 5), CALL(gnrPCACorr, 5), diff --git a/src/genLD.cpp b/src/genLD.cpp index afd81b1..1cc88c7 100755 --- a/src/genLD.cpp +++ b/src/genLD.cpp @@ -188,7 +188,6 @@ namespace LD C_UInt8 g2 = (0<=*snp2 && *snp2<=2) ? (*snp2 | ~0x03) : 0xFF; size_t _p = (size_t(g1) << 8) | (g2); size_t _q = (size_t(g2) << 8) | (g1); - n += Valid_Num_SNP[_p]; naa += Num_aa_SNP[_p]; nbb += Num_aa_SNP[_q]; naA += Num_aA_SNP[_p]; nbB += Num_aA_SNP[_q]; @@ -739,21 +738,78 @@ namespace LD { switch (LD_Method) { - case 1: - return PairComposite(snp1, snp2); - case 2: - return PairR(snp1, snp2); - case 3: - return PairDPrime(snp1, snp2); - case 4: - return PairCorr(snp1, snp2); + case 1: return PairComposite(snp1, snp2); + case 2: return PairR(snp1, snp2); + case 3: return PairDPrime(snp1, snp2); + case 4: return PairCorr(snp1, snp2); + default: return R_NaN; } - return R_NaN; } + class CThreadPoolLD + { + public: + const C_UInt8 *BaseGeno; + vector List; + CThreadPoolEx thpool; + + /// constructor + CThreadPoolLD(int num_thread, const C_UInt8 *baseG, double ld_cutoff): + BaseGeno(baseG), thpool(num_thread), nthread(num_thread), + thread_start(num_thread), thread_length(num_thread), + test_flag(num_thread), LD_threshold(ld_cutoff) + { } + + inline void TestLD(const C_UInt8 geno[], bool &to_include) + { + if (to_include) + { + if (nthread <= 1) + { + if (fabs(_CalcLD(geno, BaseGeno)) > LD_threshold) + to_include = false; + } else + List.push_back(geno); + } + } + + inline void TestLD_finish(bool &to_include) + { + if (to_include && nthread>1 && !List.empty()) + { + thpool.Split(nthread, List.size(), &thread_start[0], &thread_length[0]); + thpool.BatchWork(this, &CThreadPoolLD::thread_calc_LD, nthread); + for (int i=0; i < nthread; i++) + { + if (test_flag[i]) { to_include = false; break; } + } + } + } + + private: + int nthread; + vector thread_start, thread_length; + vector test_flag; + double LD_threshold; + + void thread_calc_LD(size_t i, size_t num) + { + test_flag[i] = false; + size_t st = thread_start[i]; + for (size_t n = thread_length[i]; n > 0; n--) + { + if (fabs(_CalcLD(List[st++], BaseGeno)) > LD_threshold) + { + test_flag[i] = true; break; + } + } + + } + }; + COREARRAY_DLL_LOCAL void Perform_LD_Pruning(int StartIdx, int *pos_bp, int slide_max_bp, int slide_max_n, const double LD_threshold, - C_BOOL *out_SNP, bool verbose) + C_BOOL *out_SNP, int num_thread, bool verbose) { // initial variables nPackedSamp = (MCWorkingGeno.Space().SampleNum() % 4 > 0) ? @@ -772,6 +828,8 @@ namespace LD CdProgression progress(2, verbose); if (verbose) Rprintf("|"); + CThreadPoolLD SetLD(num_thread, &buf[0], LD_threshold); + // for-loop, increasing progress.Init(MCWorkingGeno.Space().SNPNum()-StartIdx-1, verbose); for (int i=StartIdx+1; i < MCWorkingGeno.Space().SNPNum(); i++) @@ -780,6 +838,7 @@ namespace LD BufSNP.ReadPackedGeno(i, &buf[0]); progress.Forward(1, verbose); // detect LD + SetLD.List.clear(); bool to_include = true; for (list::iterator it=ListGeno.begin(); it != ListGeno.end(); ) { @@ -787,9 +846,7 @@ namespace LD if ((abs(i - it->idx) <= slide_max_n) && (abs(pos_bp[i] - it->pos_bp) <= slide_max_bp)) { - if (to_include && - (fabs(_CalcLD(&(it->genobuf[0]), &buf[0])) > LD_threshold)) - to_include = false; + SetLD.TestLD(&(it->genobuf[0]), to_include); it ++; } else { // delete it @@ -798,6 +855,7 @@ namespace LD ListGeno.erase(tmp_it); } } + SetLD.TestLD_finish(to_include); // handle out_SNP[i] = to_include; if (out_SNP[i]) @@ -836,6 +894,7 @@ namespace LD BufSNP.ReadPackedGeno(i, &buf[0]); progress.Forward(1, verbose); // detect LD + SetLD.List.clear(); bool to_include = true; for (list::iterator it=ListGeno.begin(); it != ListGeno.end(); ) { @@ -843,9 +902,7 @@ namespace LD if ((abs(i - it->idx) <= slide_max_n) && (abs(pos_bp[i] - it->pos_bp) <= slide_max_bp)) { - if (to_include && - (fabs(_CalcLD(&(it->genobuf[0]), &buf[0])) > LD_threshold)) - to_include = false; + SetLD.TestLD(&(it->genobuf[0]), to_include); it++; } else { // delete it @@ -854,6 +911,7 @@ namespace LD ListGeno.erase(tmp_it); } } + SetLD.TestLD_finish(to_include); // handle out_SNP[i] = to_include; if (out_SNP[i]) @@ -955,16 +1013,17 @@ COREARRAY_DLL_EXPORT SEXP gnrLDMat(SEXP method, SEXP NumSlide, SEXP MatTrim, /// Prune SNPs based on LD COREARRAY_DLL_EXPORT SEXP gnrLDpruning(SEXP StartIdx, SEXP pos_bp, SEXP slide_max_bp, SEXP slide_max_n, SEXP LD_threshold, SEXP method, - SEXP verbose) + SEXP NumThread, SEXP verbose) { COREARRAY_TRY vector flag(MCWorkingGeno.Space().SNPNum()); + const int nthread = Rf_asInteger(NumThread); LD::LD_Method = Rf_asInteger(method); LD::Perform_LD_Pruning(Rf_asInteger(StartIdx)-1, INTEGER(pos_bp), Rf_asInteger(slide_max_bp), Rf_asInteger(slide_max_n), Rf_asReal(LD_threshold), &flag[0], - Rf_asLogical(verbose) == TRUE); + nthread, Rf_asLogical(verbose) == TRUE); PROTECT(rv_ans = NEW_LOGICAL(MCWorkingGeno.Space().SNPNum())); int *p = LOGICAL(rv_ans);