-
Notifications
You must be signed in to change notification settings - Fork 1
/
break_scaffolds.py
131 lines (102 loc) · 3.75 KB
/
break_scaffolds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import argparse
import pathlib
# Local imports.
import fasta
import agp
def setup():
parser = argparse.ArgumentParser(
description="Split a scaffold into its component contigs."
)
parser.add_argument(
"-i",
"--input",
required=True,
type=pathlib.Path,
help="Input FASTA file with scaffolded sequences.",
)
return parser.parse_args()
def break_scaffold(sequence: str, n: int = 10) -> list:
"""
We want to split scaffolds whenever there is a run of Ns. To achieve this,
we scan a sequence from left to right and keep track of the number of
consecutive Ns. If the number ever exceeds the threshold, we save the
sequence, and wait until the next non N base pair is seen to start saving
the next sequence.
Args:
sequence: A sequence of nucleotides.
n: The number of N's in a row we need to observe before breaking a
scaffold.
Returns:
A list of strings, where each string is set of nucleotides found before
or after a series of N's.
"""
contigs = []
n_count = 0
current = ""
for base in sequence:
current += base
# If the base is an N, add to the N counter, otherwise reset it.
if base != "N":
n_count = 0
else:
n_count += 1
# If the N count is greater than the threshold add the contig.
if n_count >= n:
if len(current[:-10]) > 0:
contigs.append(current[:-10])
current = ""
# Add the last contig if it was not a run of Ns.
if len(current) > 0:
contigs.append(current)
return contigs
def get_contigs(sequences: dict, n: int = 10) -> tuple:
"""
Given a set of scaffolded sequences, break the sequences at runs of Ns and
then return a list of subsequences and the scaffolds they belonged to.
Args:
sequences: A sequence of nucleotides.
n: The number of N's in a row we need to observe before breaking a
scaffold.
Returns:
Two dictionaries. First, a dictionary representation of the scaffolded
assembly which can be later read by agp.write() to create an AGP file.
Second, a dictionary with contig names and sequences so that we can use
them later to align against the reference genome.
"""
contigs = {}
assembly = {}
for scaffold in sequences:
# If there are spaces in the seqeunce header, drop everything after the
# first one.
scaffold_name = scaffold.split(" ")[0]
assembly[scaffold_name] = {}
# Split the scaffold sequence at Ns.
contig_sequences = break_scaffold(sequences[scaffold])
# Record each contig found in the scaffold.
contig_index = 1
for sequence in contig_sequences:
contig_name = f"{scaffold_name}.{contig_index}"
contigs[contig_name] = sequence
assembly[scaffold_name][contig_name] = {
"start": 1,
"end": len(sequence),
"orientation": "+",
}
# Iterate contig index.
contig_index += 1
return assembly, contigs
def main(infile, outdir):
# Split scaffolds into contigs.
print("Generating contigs from scaffolds.")
scaffolds = fasta.read(infile)
assembly, contigs = get_contigs(scaffolds)
# Write contigs to FASTA.
fasta_file = pathlib.Path(f"{infile.stem}_contigs.fasta")
fasta.write(contigs, fasta_file)
# Write the AGP file.
agp_file = outdir.joinpath(f"{infile.stem}_assembly.agp")
agp.write(assembly, agp_file)
return fasta_file, agp_file
if __name__ == "__main__":
arguments = setup()
main(infile=arguments.input, outdir=pathlib.Path.cwd())