-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
148 lines (128 loc) · 4.17 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from Bio import SeqIO
import os, io
import pandas as pd
from codecs import decode
import subprocess as sp
def read_fasta(fastafile: str)->list:
'''
Reads fastafile
:param fastafile: file path
:return: list of fasta record
'''
recordlist = []
# open file
with open(fastafile) as handle:
for record in SeqIO.parse(handle, "fasta"):
recordlist.append(record)
return recordlist
def write_fasta(path: str, recordlist: list)->None:
'''
Writes a fasta file to a given location
:param path: location to write fasta
:param recordlist: list of fasta records
:return: None
'''
with open(path, "w") as output_handle:
SeqIO.write(recordlist, output_handle, "fasta")
def makedir(path: str, force_make: bool=False)->str:
'''
Makes a directory if the given direcotry doesn't exist. If force_make true, makes a directory with a number
:param path: location of new directory
:param force_make: To make a new directory with a number if a directory already exists at given path
:return: path to new dir
'''
try:
os.mkdir(path)
return path
except:
if not force_make:
return path
i = 1
if force_make:
while True:
new_name = f"{path}({i})"
try:
os.mkdir(new_name)
break
except:
i += 1
return new_name
def fasta_breaker(fasta: list, tempdir: str)-> None:
'''
Writes a fasta list as individual files under contig 1 to n number of records.
:param fasta: fasta records list
:param tempdir: directory to write records to
:return: None
'''
records = read_fasta(fasta)
i = 1
for contig in records:
fasta_location = f"{tempdir}/contig_{i}"
write_fasta(fasta_location, contig)
i += 1
def read_fasta_single(fastafile: str)->object:
'''
Returns a single fasta record (only use for fasta with a single record)
:param fastafile: path to fastafile
:return: return fasta record
'''
with open(fastafile) as handle:
for record in SeqIO.parse(handle, "fasta"):
return record
def blast2df(result: object)->object:
'''
Takes output from blast_run and converts to a dataframe
:param result: output from blast_run
:return: dataframe
'''
col_names=["qtitle", "query_length", "subject_length", "alignment_length", "query_coverage", "percent_identical", "e value", "qstart", "qend"]
df=pd.read_csv(
io.StringIO(decode(result)),
names=col_names
)
df["Match_base_pid"]=df["query_coverage"]*df["percent_identical"]/100
df=df.drop_duplicates(subset="qtitle")
return df
def blast_run(query: str, db_path: str, perc:float=80.0, eval: float=0.00001, threads: int=1)->object:
'''
Runs blastn and returns a dataframe
:param query: path to query fasta
:param db_path: path to database for blasting
:param perc: minimum percent id
:param eval: minumum e-value
:param threads: number of threads to use
:return: dataframe
'''
cmd=f"blastn -query {query} -perc_identity {perc} -evalue {eval} -num_threads {threads} " \
f"-outfmt '10 sseqid qlen slen length qcovus pident evalue qstart qend' -db {db_path}"
proc1=sp.run(cmd, shell=True, stdout=sp.PIPE)
df=blast2df(proc1.stdout)
return df
def write_txt(path: str, text: str)-> None:
'''
writes a string
:param path: path to txt
:param text: string to write
:return: None
'''
with open(path, "w") as f:
f.write(text)
def plasclust2fasta(contig_arr, outdir):
rep_arr=[]
for contig in contig_arr:
if contig[4]:
rep_arr.append(contig[6])
name='plasmid_'
for rep in rep_arr:
name+=str(rep)+'_'
seq_arr=[]
for contig in contig_arr:
seq_arr.append(read_fasta_single(contig[0]))
location=f"{outdir}/{name}.fasta"
write_fasta(location, seq_arr)
def unclust2fasta(contig_arr, outdir):
filepath=f"{outdir}/unclustered_plas_contigs.fasta"
seq_arr=[]
for contig in contig_arr:
seq_arr.append(read_fasta_single(contig[0]))
write_fasta(filepath, seq_arr)