-
Notifications
You must be signed in to change notification settings - Fork 6
/
vip_gvcf.nf
137 lines (119 loc) · 4.87 KB
/
vip_gvcf.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
nextflow.enable.dsl=2
include { parseCommonSampleSheet; getAssemblies } from './modules/sample_sheet'
include { getCramRegex; getGenomeVcfRegex } from './modules/utils'
include { validate as validate_gvcf } from './modules/gvcf/validate'
include { liftover as liftover_gvcf } from './modules/gvcf/liftover'
include { validate as validate_cram } from './modules/cram/validate'
include { scatter; validateGroup } from './modules/utils'
include { merge } from './modules/gvcf/merge'
include { vcf; validateVcfParams } from './vip_vcf'
include { bed_filter } from './modules/vcf/bed_filter'
/**
* input: [project, sample, ...]
*/
workflow gvcf {
take: meta
main:
meta
| flatMap { meta -> scatter(meta) }
| set { ch_inputs_scattered }
// joint variant calling per project, per chunk
ch_inputs_scattered
| map { meta -> [groupKey([*:meta].findAll { it.key != 'sample' }, meta.project.samples.size), meta.sample] }
| groupTuple(remainder: true, sort: { left, right -> left.index <=> right.index })
| map { key, group -> validateGroup(key, group) }
| map { meta, samples -> [meta, samples.collect { it.gvcf.data }, samples.collect { it.gvcf.index } ]}
| merge
| map { meta, vcf, vcfIndex, vcfStats -> [*:meta, vcf: [data: vcf, index: vcfIndex, stats: vcfStats]] }
| set { ch_vcf_per_chunk_called }
ch_vcf_per_chunk_called
| vcf
}
workflow {
def projects = parseSampleSheet(params)
def assemblies = getAssemblies(projects)
validateGenomeVcfParams(assemblies)
// run workflow for each sample in each project
Channel.from(projects)
| flatMap { project -> project.samples.collect { sample -> [project: project, sample: sample] } }
| set { ch_sample }
// validate gvcf and decide whether liftover if required
ch_sample
| map { meta -> [meta, meta.sample.gvcf] }
| validate_gvcf
| map { meta, gVcf, gVcfIndex, gVcfStats -> [meta, [data: gVcf, index: gVcfIndex, stats: gVcfStats]] }
| branch { meta, gVcf ->
bed_filter: meta.project.regions != null
ready: true
}
| set { ch_sample_validated }
//filter
ch_sample_validated.bed_filter
| map { meta, gVcf -> [meta, meta.project.regions, gVcf.data, gVcf.index, true] }
| bed_filter
| map { meta, gVcf, gVcfIndex, gVcfStats -> [meta, [data: gVcf, index: gVcfIndex, stats: gVcfStats]] }
| set { ch_sample_filtered }
Channel.empty().mix(ch_sample_filtered, ch_sample_validated.ready)
| branch { meta, vcf ->
liftover: meta.sample.assembly != params.assembly
ready: true
}
| set { ch_sample_liftover }
// liftover gvcf
ch_sample_liftover.liftover
| map { meta, gVcf -> [meta, gVcf.data] }
| liftover_gvcf
| map { meta, gVcf, gVcfIndex, gVcfStats, gVcfRejected, gVcfRejectedIndex, gVcfRejectedStats -> [meta, [data: gVcf, index: gVcfIndex, stats: gVcfStats]] }
| set { ch_sample_liftovered }
// merge vcf channels
Channel.empty().mix(ch_sample_liftovered, ch_sample_liftover.ready)
| map { meta, gVcf -> [*:meta, sample: [*:meta.sample, gvcf: gVcf]] }
| set { ch_sample_processed }
// update project assembly and samples
ch_sample_processed
| map { meta -> [groupKey([*:meta].findAll { it.key != 'sample' }, meta.project.samples.size), meta.sample] }
| groupTuple(remainder: true, sort: { left, right -> left.index <=> right.index })
| map { key, group -> validateGroup(key, group) }
| map { meta, samples -> [*:meta, project: [*:meta.project, assembly: params.assembly, samples: samples]] }
| set { ch_project_processed }
// map to updated samples
ch_project_processed
| flatMap { meta -> meta.project.samples.collect { sample -> [*:meta, sample: sample] } }
| gvcf
}
def validateGenomeVcfParams(assemblies) {
validateVcfParams(assemblies)
// general
def mergePreset = params.gvcf.merge_preset
if (!(mergePreset ==~ /gatk|gatk_unfiltered|DeepVariant|DeepVariant_unfiltered/)) exit 1, "parameter 'gvcf.merge_preset' value '${mergePreset}' is invalid. allowed values are [gatk, gatk_unfiltered, DeepVariant, DeepVariant_unfiltered]"
}
def parseSampleSheet(params) {
def cols = [
assembly: [
type: "string",
default: { 'GRCh38' },
enum: ['GRCh37', 'GRCh38', 'T2T']
],
gvcf: [
type: "file",
required: true,
regex: getGenomeVcfRegex()
],
cram: [
type: "file",
regex: getCramRegex()
]
]
def projects = parseCommonSampleSheet(params.input, params.hpo_phenotypic_abnormality, cols)
validate(projects)
return projects
}
def validate(projects) {
projects.each { project ->
project.samples.each { sample ->
if ((sample.assembly != params.assembly) && (sample.cram != null)) {
throw new IllegalArgumentException("line ${sample.index}: 'cram' column must be empty because input assembly '${sample.assembly}' differs from output assembly '${params.assembly}' (liftover not possible).")
}
}
}
}