Skip to content

Commit

Permalink
multi-threading snpgdsLDpruning()
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengxwen committed Mar 8, 2024
1 parent 303f40d commit b622b44
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 29 deletions.
2 changes: 1 addition & 1 deletion NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CHANGES IN VERSION 1.37.2

UTILITIES

o faster `snpgdsLDpruning()`
o faster `snpgdsLDpruning()` and new multi-threading implementation


CHANGES IN VERSION 1.36.1
Expand Down
10 changes: 3 additions & 7 deletions R/LD.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# A High-performance Computing Toolset for Relatedness and
# Principal Component Analysis of SNP Data
#
# Copyright (C) 2011 - 2020 Xiuwen Zheng
# Copyright (C) 2011 - 2024 Xiuwen Zheng
# License: GPL-3
#

Expand Down Expand Up @@ -117,11 +117,6 @@ snpgdsLDpruning <- function(gdsobj, sample.id=NULL, snp.id=NULL,
stopifnot(is.numeric(ld.threshold), is.finite(ld.threshold),
length(ld.threshold)==1L)
stopifnot(is.numeric(num.thread), num.thread > 0L)
if (num.thread > 1L)
{
warning("The current version of 'snpgdsLDpruning()' ",
"does not support multi-threading.", call.=FALSE, immediate.=TRUE)
}
stopifnot(is.logical(verbose), length(verbose)==1L)

start.pos <- match.arg(start.pos)
Expand Down Expand Up @@ -199,7 +194,8 @@ snpgdsLDpruning <- function(gdsobj, sample.id=NULL, snp.id=NULL,
first = 1L,
last = n.tmp)
rv <- .Call(gnrLDpruning, startidx, position[flag],
slide.max.bp, slide.max.n, ld.threshold, method, verbose)
slide.max.bp, slide.max.n, ld.threshold, method,
num.thread, verbose)

# output
L <- rep(FALSE, length(total.snp.ids))
Expand Down
4 changes: 2 additions & 2 deletions src/SNPRelate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ COREARRAY_DLL_EXPORT void R_init_SNPRelate(DllInfo *info)
extern SEXP gnrIndInbCoef(SEXP, SEXP, SEXP);
extern SEXP gnrLDMat(SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP gnrLDpair(SEXP, SEXP, SEXP);
extern SEXP gnrLDpruning(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP gnrLDpruning(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP gnrSlidingNumWin(SEXP, SEXP, SEXP, SEXP);
extern SEXP gnrSlidingWindow(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP gnrPairScore(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
Expand Down Expand Up @@ -1154,7 +1154,7 @@ COREARRAY_DLL_EXPORT void R_init_SNPRelate(DllInfo *info)
CALL(gnrIBSAve, 3), CALL(gnrIBSNum, 2),
CALL(gnrIndInb, 5), CALL(gnrIndInbCoef, 3),
CALL(gnrSSEFlag, 0), CALL(gnrLDMat, 5),
CALL(gnrLDpair, 3), CALL(gnrLDpruning, 6),
CALL(gnrLDpair, 3), CALL(gnrLDpruning, 8),
CALL(gnrPairScore, 7),
CALL(gnrPairIBD, 8), CALL(gnrPairIBDLogLik, 5),
CALL(gnrPCA, 5), CALL(gnrPCACorr, 5),
Expand Down
97 changes: 78 additions & 19 deletions src/genLD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ namespace LD
C_UInt8 g2 = (0<=*snp2 && *snp2<=2) ? (*snp2 | ~0x03) : 0xFF;
size_t _p = (size_t(g1) << 8) | (g2);
size_t _q = (size_t(g2) << 8) | (g1);

n += Valid_Num_SNP[_p];
naa += Num_aa_SNP[_p]; nbb += Num_aa_SNP[_q];
naA += Num_aA_SNP[_p]; nbB += Num_aA_SNP[_q];
Expand Down Expand Up @@ -739,21 +738,78 @@ namespace LD
{
switch (LD_Method)
{
case 1:
return PairComposite(snp1, snp2);
case 2:
return PairR(snp1, snp2);
case 3:
return PairDPrime(snp1, snp2);
case 4:
return PairCorr(snp1, snp2);
case 1: return PairComposite(snp1, snp2);
case 2: return PairR(snp1, snp2);
case 3: return PairDPrime(snp1, snp2);
case 4: return PairCorr(snp1, snp2);
default: return R_NaN;
}
return R_NaN;
}

class CThreadPoolLD
{
public:
const C_UInt8 *BaseGeno;
vector<const C_UInt8*> List;
CThreadPoolEx<CThreadPoolLD> thpool;

/// constructor
CThreadPoolLD(int num_thread, const C_UInt8 *baseG, double ld_cutoff):
BaseGeno(baseG), thpool(num_thread), nthread(num_thread),
thread_start(num_thread), thread_length(num_thread),
test_flag(num_thread), LD_threshold(ld_cutoff)
{ }

inline void TestLD(const C_UInt8 geno[], bool &to_include)
{
if (to_include)
{
if (nthread <= 1)
{
if (fabs(_CalcLD(geno, BaseGeno)) > LD_threshold)
to_include = false;
} else
List.push_back(geno);
}
}

inline void TestLD_finish(bool &to_include)
{
if (to_include && nthread>1 && !List.empty())
{
thpool.Split(nthread, List.size(), &thread_start[0], &thread_length[0]);
thpool.BatchWork(this, &CThreadPoolLD::thread_calc_LD, nthread);
for (int i=0; i < nthread; i++)
{
if (test_flag[i]) { to_include = false; break; }
}
}
}

private:
int nthread;
vector<size_t> thread_start, thread_length;
vector<int> test_flag;
double LD_threshold;

void thread_calc_LD(size_t i, size_t num)
{
test_flag[i] = false;
size_t st = thread_start[i];
for (size_t n = thread_length[i]; n > 0; n--)
{
if (fabs(_CalcLD(List[st++], BaseGeno)) > LD_threshold)
{
test_flag[i] = true; break;
}
}

}
};

COREARRAY_DLL_LOCAL void Perform_LD_Pruning(int StartIdx, int *pos_bp,
int slide_max_bp, int slide_max_n, const double LD_threshold,
C_BOOL *out_SNP, bool verbose)
C_BOOL *out_SNP, int num_thread, bool verbose)
{
// initial variables
nPackedSamp = (MCWorkingGeno.Space().SampleNum() % 4 > 0) ?
Expand All @@ -772,6 +828,8 @@ namespace LD
CdProgression progress(2, verbose);
if (verbose) Rprintf("|");

CThreadPoolLD SetLD(num_thread, &buf[0], LD_threshold);

// for-loop, increasing
progress.Init(MCWorkingGeno.Space().SNPNum()-StartIdx-1, verbose);
for (int i=StartIdx+1; i < MCWorkingGeno.Space().SNPNum(); i++)
Expand All @@ -780,16 +838,15 @@ namespace LD
BufSNP.ReadPackedGeno(i, &buf[0]);
progress.Forward(1, verbose);
// detect LD
SetLD.List.clear();
bool to_include = true;
for (list<TSNP>::iterator it=ListGeno.begin(); it != ListGeno.end(); )
{
// check whether it is in the sliding window
if ((abs(i - it->idx) <= slide_max_n) &&
(abs(pos_bp[i] - it->pos_bp) <= slide_max_bp))
{
if (to_include &&
(fabs(_CalcLD(&(it->genobuf[0]), &buf[0])) > LD_threshold))
to_include = false;
SetLD.TestLD(&(it->genobuf[0]), to_include);
it ++;
} else {
// delete it
Expand All @@ -798,6 +855,7 @@ namespace LD
ListGeno.erase(tmp_it);
}
}
SetLD.TestLD_finish(to_include);
// handle
out_SNP[i] = to_include;
if (out_SNP[i])
Expand Down Expand Up @@ -836,16 +894,15 @@ namespace LD
BufSNP.ReadPackedGeno(i, &buf[0]);
progress.Forward(1, verbose);
// detect LD
SetLD.List.clear();
bool to_include = true;
for (list<TSNP>::iterator it=ListGeno.begin(); it != ListGeno.end(); )
{
// check whether it is in the sliding window
if ((abs(i - it->idx) <= slide_max_n) &&
(abs(pos_bp[i] - it->pos_bp) <= slide_max_bp))
{
if (to_include &&
(fabs(_CalcLD(&(it->genobuf[0]), &buf[0])) > LD_threshold))
to_include = false;
SetLD.TestLD(&(it->genobuf[0]), to_include);
it++;
} else {
// delete it
Expand All @@ -854,6 +911,7 @@ namespace LD
ListGeno.erase(tmp_it);
}
}
SetLD.TestLD_finish(to_include);
// handle
out_SNP[i] = to_include;
if (out_SNP[i])
Expand Down Expand Up @@ -955,16 +1013,17 @@ COREARRAY_DLL_EXPORT SEXP gnrLDMat(SEXP method, SEXP NumSlide, SEXP MatTrim,
/// Prune SNPs based on LD
COREARRAY_DLL_EXPORT SEXP gnrLDpruning(SEXP StartIdx, SEXP pos_bp,
SEXP slide_max_bp, SEXP slide_max_n, SEXP LD_threshold, SEXP method,
SEXP verbose)
SEXP NumThread, SEXP verbose)
{
COREARRAY_TRY

vector<C_BOOL> flag(MCWorkingGeno.Space().SNPNum());
const int nthread = Rf_asInteger(NumThread);
LD::LD_Method = Rf_asInteger(method);
LD::Perform_LD_Pruning(Rf_asInteger(StartIdx)-1, INTEGER(pos_bp),
Rf_asInteger(slide_max_bp), Rf_asInteger(slide_max_n),
Rf_asReal(LD_threshold), &flag[0],
Rf_asLogical(verbose) == TRUE);
nthread, Rf_asLogical(verbose) == TRUE);

PROTECT(rv_ans = NEW_LOGICAL(MCWorkingGeno.Space().SNPNum()));
int *p = LOGICAL(rv_ans);
Expand Down

0 comments on commit b622b44

Please sign in to comment.