Skip to content

Commit

Permalink
Merge pull request #159 from nvnieuwk/fix/bottlenecks
Browse files Browse the repository at this point in the history
Fix a couple of bottlenecks in the pipeline
  • Loading branch information
nvnieuwk authored Jan 15, 2025
2 parents cbb4b84 + 4035a18 commit bfd3665
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#145](https://github.com/nf-core/rnavar/issues/145) - Converted `star_index` and `gtf` emit channels from queue to value channels in `PREPARE_GENOME` subworkflow
- [#149](https://github.com/nf-core/rnavar/pull/149) - Updated ch_gtf and ch_fasta_fai channels emitted by main.nf
- [#158](https://github.com/nf-core/rnavar/pull/158) - Fixed language server errors and warnings
- [#159](https://github.com/nf-core/rnavar/pull/159) - Fixed a couple of bottlenecks in the pipeline

### Dependencies

Expand Down
14 changes: 6 additions & 8 deletions subworkflows/local/recalibrate/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@ workflow RECALIBRATE {
ch_versions = ch_versions.mix(APPLYBQSR.out.versions.first())

SAMTOOLS_INDEX(bam_recalibrated)
ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions.first())

def bam_recalibrated_index = bam_recalibrated
.join(SAMTOOLS_INDEX.out.bai, by: [0], remainder: true)
.join(SAMTOOLS_INDEX.out.csi, by: [0], remainder: true) // TODO fix this bottleneck
.map{meta, bam_, bai, csi ->
if (bai) [meta, bam_, bai]
else [meta, bam_, csi]
}
def bam_indices = SAMTOOLS_INDEX.out.bai
.mix(SAMTOOLS_INDEX.out.csi)
.mix(SAMTOOLS_INDEX.out.crai)

ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions.first())
def bam_recalibrated_index = bam_recalibrated
.join(bam_indices, failOnMismatch: true, failOnDuplicate: true)

def bam_reports = Channel.empty()

Expand Down
41 changes: 27 additions & 14 deletions subworkflows/local/splitncigar/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,49 @@ workflow SPLITNCIGAR {
main:
def ch_versions = Channel.empty()

def bam_interval = bam.combine(intervals).map{ meta, bam_, bai, intervals_ -> [ meta + [sample:meta.id], bam_, bai, intervals_ ] }
def bam_interval = bam
.combine(intervals)
.map { meta, bam_, bai, intervals_ ->
[ meta + [interval_count:intervals_ instanceof List ? intervals_.size() : 1], bam_, bai, intervals_ ]
}
.transpose(by:3)
.map { meta, bam_, bai, interval ->
[ meta + [id:"${meta.id}_${interval.baseName}", sample: meta.id], bam_, bai, interval ]
}

GATK4_SPLITNCIGARREADS(bam_interval,
fasta,
fai.map{ fai_ -> [[id:'genome'], fai_] },
dict)

bam_splitncigar = GATK4_SPLITNCIGARREADS.out.bam
def bam_splitncigar = GATK4_SPLITNCIGARREADS.out.bam
ch_versions = ch_versions.mix(GATK4_SPLITNCIGARREADS.out.versions)

bam_splitncigar_interval = bam_splitncigar.map{ meta, bam_ -> [ meta + [id:meta.sample] - meta.subMap('sample'), bam_ ] }.groupTuple()
def bam_splitncigar_interval = bam_splitncigar
.map{ meta, bam_ ->
def new_meta = meta + [id:meta.sample] - meta.subMap('sample') - meta.subMap('interval_count')
[ groupKey(new_meta, meta.interval_count), bam_ ]
}
.groupTuple()

SAMTOOLS_MERGE(bam_splitncigar_interval,
SAMTOOLS_MERGE(
bam_splitncigar_interval,
fasta,
fai.map{ fai_ -> [[id:fai_.baseName], fai_] })
fai.map{ fai_ -> [[id:fai_.baseName], fai_] }
)

splitncigar_bam = SAMTOOLS_MERGE.out.bam
def splitncigar_bam = SAMTOOLS_MERGE.out.bam
ch_versions = ch_versions.mix(SAMTOOLS_MERGE.out.versions)

SAMTOOLS_INDEX(splitncigar_bam)
ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions)

splitncigar_bam_bai = splitncigar_bam
.join(SAMTOOLS_INDEX.out.bai, remainder: true)
.join(SAMTOOLS_INDEX.out.csi, remainder: true) // TODO fix this bottleneck
.map{meta, bam_, bai, csi ->
if (bai) [meta, bam_, bai]
else [meta, bam_, csi]
}
def splitncigar_bam_indices = SAMTOOLS_INDEX.out.bai
.mix(SAMTOOLS_INDEX.out.csi)
.mix(SAMTOOLS_INDEX.out.crai)

ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions)
def splitncigar_bam_bai = splitncigar_bam
.join(splitncigar_bam_indices, failOnDuplicate: true, failOnMismatch: true)

emit:
bam_bai = splitncigar_bam_bai
Expand Down
50 changes: 26 additions & 24 deletions workflows/rnavar/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ workflow RNAVAR {
def interval_list_split = Channel.empty()
if (!params.skip_intervallisttools) {
GATK4_INTERVALLISTTOOLS(interval_list)
interval_list_split = GATK4_INTERVALLISTTOOLS.out.interval_list.map{ _meta, bed -> [bed] }.flatten()
interval_list_split = GATK4_INTERVALLISTTOOLS.out.interval_list.map{ _meta, bed -> [bed] }.collect()
}
else {
interval_list_split = interval_list.map { _meta, bed -> bed }
Expand Down Expand Up @@ -160,13 +160,12 @@ workflow RNAVAR {
fasta,
fasta_fai)

def markduplicate_indices = BAM_MARKDUPLICATES_PICARD.out.bai
.mix(BAM_MARKDUPLICATES_PICARD.out.csi)
.mix(BAM_MARKDUPLICATES_PICARD.out.crai)

def genome_bam_bai = BAM_MARKDUPLICATES_PICARD.out.bam
.join(BAM_MARKDUPLICATES_PICARD.out.bai, remainder: true)
.join(BAM_MARKDUPLICATES_PICARD.out.csi, remainder: true) // TODO fix this bottleneck
.map{meta, bam, bai, csi ->
if (bai) [meta, bam, bai]
else [meta, bam, csi]
}
.join(markduplicate_indices, failOnDuplicate:true, failOnMismatch:true)
.mix(PREPARE_ALIGNMENT.out.bam)

//Gather QC ch_reports
Expand Down Expand Up @@ -259,7 +258,12 @@ workflow RNAVAR {
dbsnp_for_haplotypecaller_tbi = dbsnp_tbi.map{ tbi -> [[id:'dbsnp'], tbi] }
}

haplotypecaller_interval_bam = bam_variant_calling.combine(interval_list_split)
def haplotypecaller_interval_bam = bam_variant_calling.combine(interval_list_split)
.map { meta, bam, bai, interval_lists ->
def new_meta = meta + [interval_count: interval_lists instanceof List ? interval_lists.size() : 1]
[ new_meta, bam, bai, interval_lists ]
}
.transpose(by:3)
.map{ meta, bam, bai, interval_list_ ->
[ meta + [ id:meta.id + "_" + interval_list_.baseName, sample:meta.id, variantcaller:'haplotypecaller' ], bam, bai, interval_list_, [] ]
}
Expand All @@ -278,7 +282,13 @@ workflow RNAVAR {
dbsnp_for_haplotypecaller_tbi
)

def haplotypecaller_raw = GATK4_HAPLOTYPECALLER.out.vcf.map{ meta, vcf -> [ meta + [id:meta.sample] - meta.subMap('sample'), vcf ] }.groupTuple() // TODO fix this bottleneck
def haplotypecaller_out = GATK4_HAPLOTYPECALLER.out.vcf
.join(GATK4_HAPLOTYPECALLER.out.tbi, failOnMismatch:true, failOnDuplicate:true)
.map{ meta, vcf, tbi ->
def new_meta = meta + [id:meta.sample] - meta.subMap('sample', "interval_count")
[ groupKey(new_meta, meta.interval_count), vcf, tbi ]
}
.groupTuple()

ch_versions = ch_versions.mix(GATK4_HAPLOTYPECALLER.out.versions)

Expand All @@ -288,6 +298,7 @@ workflow RNAVAR {
// MODULE: MergeVCFS from GATK4
// Merge multiple VCF files into one VCF
//
def haplotypecaller_raw = haplotypecaller_out.map { meta, vcfs, _tbis -> [ meta, vcfs ]}
GATK4_MERGEVCFS(
haplotypecaller_raw,
dict
Expand All @@ -301,16 +312,14 @@ workflow RNAVAR {
TABIX(
haplotypecaller_vcf
)
ch_versions = ch_versions.mix(TABIX.out.versions)

def haplotypecaller_indices = TABIX.out.tbi
.mix(TABIX.out.csi)

def haplotypecaller_vcf_tbi = haplotypecaller_vcf
.join(TABIX.out.tbi, by: [0], remainder: true)
.join(TABIX.out.csi, by: [0], remainder: true) // TODO fix this bottleneck
.map{meta, vcf, tbi, csi ->
if (tbi) [meta, vcf, tbi]
else [meta, vcf, csi]
}
.join(haplotypecaller_indices, failOnDuplicate:true, failOnMismatch: true)

ch_versions = ch_versions.mix(TABIX.out.versions)
def final_vcf = Channel.empty()

//
Expand Down Expand Up @@ -363,20 +372,13 @@ workflow RNAVAR {
}

} else {
def combinegvcfs_input = GATK4_HAPLOTYPECALLER.out.vcf
.join(GATK4_HAPLOTYPECALLER.out.tbi, failOnMismatch:true, failOnDuplicate:true)
.map{ meta, vcf, tbi ->
def new_meta = meta + [id:meta.sample]
[new_meta, vcf, tbi]
}
.groupTuple() // TODO fix this bottleneck

//
// MODULE: CombineGVCFS from GATK4
// Merge multiple GVCF files into one GVCF
//
GATK4_COMBINEGVCFS(
combinegvcfs_input,
haplotypecaller_out,
fasta.map { _meta, fasta_ -> fasta_ },
fasta_fai.map { _meta, fai -> fai },
dict.map { _meta, dict_ -> dict_ }
Expand Down

0 comments on commit bfd3665

Please sign in to comment.