Skip to content

Commit

Permalink
fix some errors of static typing
Browse files Browse the repository at this point in the history
  • Loading branch information
chungmuen committed Oct 10, 2024
1 parent c025f74 commit 590595d
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 107 deletions.
2 changes: 1 addition & 1 deletion seqteleporter/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@


class DnaProviderSpec:
def __init__(self, provider_name, product_name, min_len, max_len):
def __init__(self, provider_name: str, product_name: str, min_len: int, max_len: int):
self.provider_name = provider_name
self.product_name = product_name
self.min_len = min_len
Expand Down
6 changes: 3 additions & 3 deletions seqteleporter/fragment_assembler/fragment_assembler.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ def __init__(self, name, dna, aa, coding_start, coding_end, wt_aa, n_term_aa, c_
self.wt_aa = wt_aa
self.translated_dna_assembly = str(Seq(self.dna)[self.coding_start:self.coding_end].translate())

def validate_seq_length(self):
def validate_seq_length(self) -> bool:
if len(self.wt_aa) == len(self.aa):
return True
print(f'validate_seq_length() failed: len(self.wt_aa)={len(self.wt_aa)}; len(self.aa)={len(self.aa)}')
return False

def validate_name_with_aa(self):
def validate_name_with_aa(self) -> bool:
# the numbering of mutations does not consider the AAs encoded by the 5'DNA and 3'DNA, thus the offset.
n_term_offset = len(self.n_term_aa)
c_term_offset = len(self.c_term_aa)
Expand All @@ -92,7 +92,7 @@ def validate_translated_dna_with_aa(self):
print(f'validate_translated_dna_with_aa() failed: {difference}')
return False

def validate_assembled_fragment(self):
def validate_assembled_fragment(self) -> bool:
validated = \
self.validate_seq_length() & \
self.validate_name_with_aa() & \
Expand Down
34 changes: 25 additions & 9 deletions seqteleporter/fragment_assembler/plate_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def plate_mapper(desired_variant_muts_list: list[list], desired_variant_names: O
def make_variant_mutation_notation_from_fragments(fragment_mut_notations: list[str]) -> str:
assembled_muts_notation = [re.sub("^.+?_|wild_type", "", muts_notation) for muts_notation in fragment_mut_notations]
assembled_muts_notation = list(filter(lambda x: x != "", assembled_muts_notation))
assembled_muts_notation_ls = []
assembled_muts_notation_ls: List = []
for mut_notation_per_frag in assembled_muts_notation:
assembled_muts_notation_ls = assembled_muts_notation_ls + mut_notation_per_frag.split('_')
assembled_muts_notation_ls.sort(key=lambda x: int(x[1:-1]))
Expand All @@ -296,7 +296,7 @@ def validate_plate_mapping_sheet(plate_mapping_sheet_file: str, desired_variant_
"""
# import data
xl = pd.ExcelFile(plate_mapping_sheet_file)
sheets = xl.sheet_names
sheets: List[str] = xl.sheet_names
plate_mapping_tbls = []
readme_df = pd.DataFrame()
for sheet in sheets:
Expand Down Expand Up @@ -364,10 +364,18 @@ def validate_plate_mapping_sheet(plate_mapping_sheet_file: str, desired_variant_
return (validated_readme and validated_plate_mapping_sheet)


def make_and_validate_plate_mapping_sheet(desired_variant_muts_list: list[list],
desired_variant_names: Optional[list[str]], fragment_sheet_path: str,
plate_format: int, aa_seq: str, backbone_len: int, enzyme: str,
five_prime_dna: str, three_prime_dna: str, start_plasmid_id: Optional[str]):
def make_and_validate_plate_mapping_sheet(
desired_variant_muts_list: list[list],
desired_variant_names: Optional[list[str]],
fragment_sheet_path: str,
plate_format: int,
aa_seq: str,
backbone_len: int,
enzyme: str,
five_prime_dna: str,
three_prime_dna: str,
start_plasmid_id: Optional[str]
) -> None:
# if encounter error msg: Can't find workbook in OLE2 compound document, remove excel sensitivity label and try again.
# ref: https://stackoverflow.com/questions/45725645/pandas-unable-to-open-this-excel-file
output_dir = path.dirname(fragment_sheet_path)
Expand Down Expand Up @@ -431,7 +439,12 @@ def find_column_index_by_column_name(sheet: workbook.workbook.Workbook, column_n
return column_index


def add_excel_volume_calc_formula_to_target_sheet(source_module_sheet_name, target_sheet_name, workbook_, dna_size):
def add_excel_volume_calc_formula_to_target_sheet(
source_module_sheet_name: str,
target_sheet_name: str,
workbook_: workbook.Workbook,
dna_size: int
) -> workbook.Workbook:

# identify the col_index of the Volume cells and Module name from the instruction sheets
target_sheet = workbook_[target_sheet_name]
Expand Down Expand Up @@ -467,7 +480,10 @@ def add_excel_volume_calc_formula_to_target_sheet(source_module_sheet_name, targ
return workbook_


def add_excel_volume_calc_formula_to_source_module_sheet(source_module_sheet_name, workbook_):
def add_excel_volume_calc_formula_to_source_module_sheet(
source_module_sheet_name: str,
workbook_: workbook.Workbook
) -> workbook.Workbook:

# identify the col_index of the Volume cells and Module name from the instruction sheets
full_mapping_sheet_name = 'source_plt_all_target_plt_all'
Expand Down Expand Up @@ -496,7 +512,7 @@ def add_excel_volume_calc_formula_to_source_module_sheet(source_module_sheet_nam
return workbook_


def batch_add_excel_volume_calc_formula(excel_file_path: str):
def batch_add_excel_volume_calc_formula(excel_file_path: str) -> None:
workbook_ = load_workbook(excel_file_path)

# identify DNA size
Expand Down
2 changes: 1 addition & 1 deletion seqteleporter/main_libs.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def generate_and_optimize_ready_to_click_modules(input_table_path):
return outfile_paths


def assemble_modules_and_generate_robot_instruction(input_table_path: str, ready_to_click_modules_path: str):
def assemble_modules_and_generate_robot_instruction(input_table_path: str, ready_to_click_modules_path: str) -> None:
desired_variants_input = pd.read_excel(input_table_path, sheet_name='input_desired_variants', header=0,
index_col=None)
out_file_path_, inputs_dict = transform_input_excel_sheet_to_text_input(input_table_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,26 +126,3 @@ def compute_ligation_fidelity(all_fusion_sites_of_a_partition: tuple,
total_lig_fidelity *= fidelity_of_this_junction

return total_lig_fidelity


if __name__ == "__main__":
# Example usage:
fidelity_data_path = r'C:\Users\GOFKV\PycharmProjects\proseqteleporter\proseqteleporter\data\neb_fidelity_data\FileS01_T4_01h_25C.xlsx'
fidelity_data_ = pd.read_excel(fidelity_data_path, index_col=0)
all_fusion_sites_of_a_partition_ = ('AAGG', 'ACTC', 'AGGA', 'AGTG', 'ATCA')

# Convert the DataFrame to a NumPy array
fidelity_data = fidelity_data_.values

# Create a mapping of fusion site names to their indices
fusion_site_indices = {site: idx for idx, site in enumerate(fidelity_data_.index)}
fusion_site_cols = {site: idx for idx, site in enumerate(fidelity_data_.columns)}
res = compute_ligation_fidelity(all_fusion_sites_of_a_partition=all_fusion_sites_of_a_partition_,
fidelity_data=fidelity_data,
fusion_site_indices=fusion_site_indices,
fusion_site_cols=fusion_site_cols)


# res = compute_ligation_fidelity(all_fusion_sites_of_a_partition=all_fusion_sites_of_a_partition_,
# fidelity_data=sf.FrameHE.from_pandas(fidelity_data_))
print(res)
52 changes: 35 additions & 17 deletions seqteleporter/partitioner/compute_best_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def prepare_output_dirs(output_dir):
return log_dir, result_dir


def prepare_compute_best_partitions_params(input_file_path):
def prepare_compute_best_partitions_params(input_file_path: str) -> dict:
input_params = load_input_params(input_file_path=input_file_path, supress_output=True)
print(input_params['mutations_1idx'])
all_mutations_0idx, linked_mutations_0idx = prepare_0idx_mutations(
Expand Down Expand Up @@ -71,12 +71,21 @@ def prepare_compute_best_partitions_params(input_file_path):
the given constraints, utilizing experimental data for fusion site information."""


def write_compute_best_partitions_log_header(s: str, mutations_0idx: Union[list, None],
linked_mutations_0idx: Union[list, None],
fusion_sites_used_by_backbone: Tuple[str, ...], min_aa_length: int,
max_cost: int, max_unevenness: float, min_ligation_fidelity: float,
satisfaction_fidelity: float, search_method: str, host: str,
sort_by_cost: bool, compute_best_partitions_log_file_path: str):
def write_compute_best_partitions_log_header(
s: str,
mutations_0idx: Union[list, None],
linked_mutations_0idx: Union[list, None],
fusion_sites_used_by_backbone: Tuple[str, ...],
min_aa_length: int,
max_cost: int,
max_unevenness: float,
min_ligation_fidelity: float,
satisfaction_fidelity: float,
search_method: str,
host: str,
sort_by_cost: bool,
compute_best_partitions_log_file_path: str
) -> None:

with open(compute_best_partitions_log_file_path, 'a') as fd:
fd.write(f'\n Sequence: {s}'
Expand All @@ -93,12 +102,19 @@ def write_compute_best_partitions_log_header(s: str, mutations_0idx: Union[list,
f'\n Sort by cost: {sort_by_cost}')


def write_compute_best_partitions_log_body(compute_best_partitions_log_file_path: str, number_of_cuts: int,
elapsed_time_number_of_cuts: float, num_of_checked_partitions: int,
num_of_checked_unique_partitions:int, hard_constraint_violations: dict,
select_top_n_partitions: int, sel_partitions: List[dict],
mutations_0idx: Union[list, None], linked_mutations_0idx: Union[list, None],
supress_output: bool):
def write_compute_best_partitions_log_body(
compute_best_partitions_log_file_path: str,
number_of_cuts: int,
elapsed_time_number_of_cuts: float,
num_of_checked_partitions: int,
num_of_checked_unique_partitions: int,
hard_constraint_violations: dict,
select_top_n_partitions: int,
sel_partitions: List[dict],
mutations_0idx: Union[list, None],
linked_mutations_0idx: Union[list, None],
supress_output: bool
) -> None:

with open(compute_best_partitions_log_file_path, 'a') as fd:
compute_best_partitions_log_header = \
Expand Down Expand Up @@ -135,7 +151,7 @@ def write_compute_best_partitions_log_body(compute_best_partitions_log_file_path
print(compute_best_partitions_log_txt)


def validate_inputs(s: str, fusion_sites_used_by_backbone: Tuple[str, ...]):
def validate_inputs(s: str, fusion_sites_used_by_backbone: Tuple[str, ...]) -> None:
if not is_aa(s):
raise ValueError(f"The provided input sequence is not a valid amino acid sequence!")

Expand Down Expand Up @@ -294,9 +310,11 @@ def compute_best_partitions(s: str, mutations_0idx: Union[list, None], linked_mu
if len(partition['partition']) == 0:
expr = s
else:
expr = pretty_fragments_expression(fragments=partition["fragments"],
fragment_with_fusion_sites=fragment_with_fusion_sites,
fusion_site_len=ENZYME_INFO[enzyme]['fusion_site_length'])
expr = pretty_fragments_expression(
fragments=partition["fragments"],
fragment_with_fusion_sites=fragment_with_fusion_sites,
fusion_site_len=int(ENZYME_INFO[enzyme]['fusion_site_length'])
)
partition.update({
'expression': expr,
'fragment_with_fusion_sites': fragment_with_fusion_sites
Expand Down
8 changes: 4 additions & 4 deletions seqteleporter/partitioner/partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def find_cuttable_positions(s: str, mutations_0idx: Optional[List[Any]], linked_
return cuttable_positions


def find_even_cuts(string, regions):
def find_even_cuts(string: str, regions: List) -> List:
total_length = len(string)
n = len(regions) + 1 # Number of parts will be n
desired_length = total_length / n # Desired length of each part
Expand Down Expand Up @@ -130,13 +130,13 @@ def partitioner(s: str, cuttable_positions: list[int], number_of_cuts: int, muta
return partitions_list


def sort_cut_sites_by_eveness(string, regions):
def sort_cut_sites_by_eveness(string: str, regions: List) -> List[List]:
best_cuts = find_even_cuts(string, regions)
sorted_cuttable_sites = sort_indices_by_distance(regions, best_cuts)
return sorted_cuttable_sites


def sort_indices_by_distance(regions, best_cuts):
def sort_indices_by_distance(regions: List, best_cuts: List) -> List[List]:
sorted_indices = []

for i in range(len(regions)):
Expand Down Expand Up @@ -168,7 +168,7 @@ def generate_cut_ranges_from_a_mutation_distribution(mutation_distribution: dict
return allow_cut_ranges


def count_bases_in_a_mutation_distribution(s, distributed_mutations_0idx_lists):
def count_bases_in_a_mutation_distribution(s: str, distributed_mutations_0idx_lists: List) -> int:
base_counts = []
for idx, muts in enumerate(distributed_mutations_0idx_lists):
if idx == 0:
Expand Down
76 changes: 38 additions & 38 deletions seqteleporter/post_partition_processor/post_partition_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from itertools import product
import re
import copy
from typing import Tuple, List, Union
from typing import Tuple, List, Union, Dict
import pandas as pd
from Bio.Restriction import *
from Bio.Seq import Seq, MutableSeq
Expand Down Expand Up @@ -188,7 +188,7 @@ def make_mutant_aa_fragments(fragment_n_and_c_term_dna: dict, mutations_0idx: li
mut_new_format.append([(d['position'], aa)])
mutations_0idx_reformat.update({(d['position'],): mut_new_format})

linked_mutations_reformat = {}
linked_mutations_reformat: Dict = {}
if linked_mutations_0idx:
for mut_set in linked_mutations_0idx:
key = tuple([mut[1] for mut in mut_set])
Expand Down Expand Up @@ -379,7 +379,7 @@ def find_enzyme_sites_in_dna(dna_seq: str, enzyme: str, print_result: bool) -> L
biopy_enzyme = globals()[enzyme]
fw_enzyme_site = biopy_enzyme.site
rv_enzyme_site = make_rev_compliment(fw_enzyme_site)
locs = []
locs: List = []
for enzyme_site in [fw_enzyme_site, rv_enzyme_site]:
enzyme_site_count = len(re.findall(enzyme_site, dna_seq))
if enzyme_site_count != 0:
Expand Down Expand Up @@ -693,41 +693,41 @@ def post_partition_processing(input_file_path: str, best_partitions_by_cut_numbe
min_dna_frag_length=min_dna_frag_length,
positions_include_wt_aa_0idx=positions_include_wt_aa_0idx)

if validate_partitioned_fragments_by_insilico_assembly(mutant_dna_fragments=mutant_dna_fragments,
sample_number=validate_sample_number,
wt_seq=input_params['s'],
enzyme=input_params['enzyme'],
five_prime_dna=input_params['five_prime_dna'],
three_prime_dna=input_params['three_prime_dna'],
coding_start=validate_coding_start):
mtp_format = input_params['module_plate_format']
if mtp_format not in PLATE_FORMATS.keys():
raise ValueError(f'Invalid MTP format. MTP format muts be one of these: {list(PLATE_FORMATS.keys())}')

outfile_path = path.join(path.dirname(path.dirname(best_partitions_by_cut_number_file)), 'results',
f'order_modules_{cut_number+1}fragments.xlsx')
export_module_ordering_sheet(
gene_abbreviation=input_params['gene_name'],
if not validate_partitioned_fragments_by_insilico_assembly(
mutant_dna_fragments=mutant_dna_fragments,
row_range=PLATE_FORMATS[mtp_format]['rows'],
column_range=PLATE_FORMATS[mtp_format]['columns'],
sample_number=validate_sample_number,
wt_seq=input_params['s'],
enzyme=input_params['enzyme'],
output_file=outfile_path
)

exact_cost = find_exact_cost(mutant_dna_fragments=mutant_dna_fragments,
price_per_base=cost_per_nt)
print(
f'\n\033[1m'
f'\n================================================================================================'
f'\n SUCCESSFULLY GENERATED MODULES! '
f'\n Number of fragments: {cut_number+1} '
f'\n Estimated cost: {exact_cost} €'
f'\n================================================================================================'
f'\033[0m'
)

return mutant_aa_fragments, mutant_dna_fragments, outfile_path

else:
five_prime_dna=input_params['five_prime_dna'],
three_prime_dna=input_params['three_prime_dna'],
coding_start=validate_coding_start
):
raise ValueError('validate_partitioned_fragments_by_insilico_assembly() failed')

mtp_format = input_params['module_plate_format']
if mtp_format not in PLATE_FORMATS.keys():
raise ValueError(f'Invalid MTP format. MTP format muts be one of these: {list(PLATE_FORMATS.keys())}')
outfile_path = path.join(path.dirname(path.dirname(best_partitions_by_cut_number_file)), 'results',
f'order_modules_{cut_number + 1}fragments.xlsx')
export_module_ordering_sheet(
gene_abbreviation=input_params['gene_name'],
mutant_dna_fragments=mutant_dna_fragments,
row_range=PLATE_FORMATS[mtp_format]['rows'],
column_range=PLATE_FORMATS[mtp_format]['columns'],
enzyme=input_params['enzyme'],
output_file=outfile_path
)
exact_cost = find_exact_cost(mutant_dna_fragments=mutant_dna_fragments,
price_per_base=cost_per_nt)
print(
f'\n\033[1m'
f'\n================================================================================================'
f'\n SUCCESSFULLY GENERATED MODULES! '
f'\n Number of fragments: {cut_number + 1} '
f'\n Estimated cost: {exact_cost} €'
f'\n================================================================================================'
f'\033[0m'
)

return mutant_aa_fragments, mutant_dna_fragments, outfile_path

2 changes: 1 addition & 1 deletion seqteleporter/utils/idt_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def idt_complexity_screener(dna: List[Dict[str, str]], idt_credentials: Type[Idt
return result_list


def get_access_token(client_id, client_secret, idt_username, idt_password):
def get_access_token(client_id: str, client_secret: str, idt_username: str, idt_password: str) -> str:
"""
Create the HTTP request, transmit it, and then parse the response for the
access token.
Expand Down
Loading

0 comments on commit 590595d

Please sign in to comment.