Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove instance id from accessioning and clustering #207

Merged
merged 3 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,10 @@ By default, the script will run all of these tasks, though you may specify a sub
Note that as some steps are long-running the script is best run in a screen/tmux session.

```bash
# To run everything - defaults to instance 1 for accessioning
# To run everything
python ingest_submission.py --eload 765

# Only accessioning - can specify instance if necessary
python ingest_submission.py --eload 765 --instance 1 --tasks accession

# Only variant load - accession instance id not needed
# Only variant load
python ingest_submission.py --eload 765 --tasks variant_load

# Only run VEP annotation - note this assumes variant load has been run
Expand Down
6 changes: 0 additions & 6 deletions bin/ingest_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
def main():
argparse = ArgumentParser(description='Accession and ingest submission data into EVA')
argparse.add_argument('--eload', required=True, type=int, help='The ELOAD number for this submission.')
argparse.add_argument('--instance', required=False, type=int, choices=range(1, 13), default=2,
help='The instance id to use for accessioning. Only needed if running accessioning.')
argparse.add_argument('--clustering_instance', required=False, type=int, choices=range(1, 13), default=6,
help='The instance id to use for clustering. Only needed if running clustering.')
argparse.add_argument('--tasks', required=False, type=str, nargs='+',
default=EloadIngestion.all_tasks, choices=EloadIngestion.all_tasks,
help='Task or set of tasks to perform during ingestion.')
Expand All @@ -55,8 +51,6 @@ def main():
with EloadIngestion(args.eload) as ingestion:
ingestion.upgrade_config_if_needed()
ingestion.ingest(
instance_id=args.instance,
clustering_instance_id=args.clustering_instance,
tasks=args.tasks,
vep_cache_assembly_name=args.vep_cache_assembly_name,
resume=args.resume
Expand Down
18 changes: 3 additions & 15 deletions eva_submission/eload_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ def __init__(self, eload_number, config_object: EloadConfig = None):

def ingest(
self,
instance_id=None,
clustering_instance_id=None,
tasks=None,
vep_cache_assembly_name=None,
resume=False
Expand All @@ -80,8 +78,6 @@ def ingest(
do_accession = 'accession' in tasks
do_variant_load = 'variant_load' in tasks

if instance_id:
self.eload_cfg.set(self.config_section, 'accession', 'instance_id', value=instance_id)
if do_accession:
self.update_config_with_hold_date(self.project_accession)

Expand All @@ -91,7 +87,6 @@ def ingest(
self.run_accession_and_load_workflow(vcf_files_to_ingest, resume=resume, tasks=tasks)

if 'optional_remap_and_cluster' in tasks:
self.eload_cfg.set(self.config_section, 'clustering', 'instance_id', value=clustering_instance_id)
target_assembly = self._get_target_assembly()
if target_assembly:
self.eload_cfg.set(self.config_section, 'remap_and_cluster', 'target_assembly', value=target_assembly)
Expand Down Expand Up @@ -310,10 +305,9 @@ def _generate_csv_mappings_to_ingest(self):
return vcf_files_to_ingest

def run_accession_and_load_workflow(self, vcf_files_to_ingest, resume, tasks=None):
instance_id = self.eload_cfg.query(self.config_section, 'accession', 'instance_id')
output_dir = os.path.join(self.project_dir, project_dirs['accessions'])
accession_properties_file = self.create_accession_properties(
instance_id=instance_id, output_file_path=os.path.join(output_dir, 'accession.properties'))
output_file_path=os.path.join(output_dir, 'accession.properties'))
variant_load_properties_file = self.create_variant_load_properties(
output_file_path=os.path.join(self.project_dir, 'variant_load.properties'))
accession_import_properties_file = self.create_accession_import_properties(
Expand All @@ -322,7 +316,6 @@ def run_accession_and_load_workflow(self, vcf_files_to_ingest, resume, tasks=Non
accession_config = {
'valid_vcfs': vcf_files_to_ingest,
'project_accession': self.project_accession,
'instance_id': instance_id,
'vep_path': cfg['vep_path'],
'project_dir': str(self.project_dir),
'public_ftp_dir': cfg['public_ftp_dir'],
Expand All @@ -340,7 +333,6 @@ def run_accession_and_load_workflow(self, vcf_files_to_ingest, resume, tasks=Non
self.run_nextflow('accession_and_load', accession_config, resume, tasks)

def run_remap_and_cluster_workflow(self, target_assembly, resume):
clustering_instance = self.eload_cfg.query(self.config_section, 'clustering', 'instance_id')
scientific_name = self.eload_cfg.query('submission', 'scientific_name')
# this is where all the output will get stored - logs, properties, work dirs...
output_dir = os.path.join(self.project_dir, project_dirs['clustering'])
Expand All @@ -355,7 +347,6 @@ def run_remap_and_cluster_workflow(self, target_assembly, resume):
)
clustering_template_file = self.create_clustering_properties(
output_file_path=os.path.join(output_dir, 'clustering_template.properties'),
clustering_instance=clustering_instance,
target_assembly=target_assembly
)

Expand All @@ -370,7 +361,6 @@ def run_remap_and_cluster_workflow(self, target_assembly, resume):
'extraction_properties': extraction_properties_file,
'ingestion_properties': ingestion_properties_file,
'clustering_properties': clustering_template_file,
'clustering_instance': clustering_instance,
'remapping_config': cfg.config_file
}
for part in ['executable', 'nextflow', 'jar']:
Expand Down Expand Up @@ -472,9 +462,8 @@ def create_ingestion_properties(self, output_file_path, target_assembly):
open_file.write(properties)
return output_file_path

def create_clustering_properties(self, output_file_path, clustering_instance, target_assembly):
def create_clustering_properties(self, output_file_path, target_assembly):
properties = self.properties_generator.get_clustering_properties(
instance=clustering_instance,
target_assembly=target_assembly,
projects=self.project_accession,
rs_report_path=f'{target_assembly}_rs_report.txt'
Expand All @@ -483,9 +472,8 @@ def create_clustering_properties(self, output_file_path, clustering_instance, ta
open_file.write(properties)
return output_file_path

def create_accession_properties(self, instance_id, output_file_path):
def create_accession_properties(self, output_file_path):
properties = self.properties_generator.get_accessioning_properties(
instance=instance_id,
project_accession=self.project_accession,
taxonomy_accession=self.taxonomy
)
Expand Down
5 changes: 1 addition & 4 deletions eva_submission/nextflow/accession_and_load.nf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def helpMessage() {
--acc_import_job_props properties file for accession import job
--annotation_only whether to only run annotation job
--project_accession project accession
--instance_id instance id to run accessioning
--accession_job_props properties file for accessioning job
--public_ftp_dir public FTP directory
--accessions_dir accessions directory (for properties files)
Expand All @@ -24,7 +23,6 @@ def helpMessage() {

params.valid_vcfs = null
params.project_accession = null
params.instance_id = null
params.accession_job_props = null
params.public_ftp_dir = null
params.accessions_dir = null
Expand All @@ -48,12 +46,11 @@ params.help = null
if (params.help) exit 0, helpMessage()

// Test input files
if (!params.valid_vcfs || !params.project_accession || !params.instance_id || !params.accession_job_props ||\
if (!params.valid_vcfs || !params.project_accession || !params.accession_job_props ||\
!params.public_ftp_dir || !params.accessions_dir || !params.public_dir || !params.logs_dir || !params.taxonomy || \
!params.vep_path || !params.load_job_props || !params.acc_import_job_props || !params.project_dir) {
if (!params.valid_vcfs) log.warn('Provide a csv file with the mappings (Provide a csv file with the mappings (vcf file, assembly accession, fasta, assembly report, analysis_accession, db_name) --valid_vcfs')
if (!params.project_accession) log.warn('Provide a project accession using --project_accession')
if (!params.instance_id) log.warn('Provide an instance id using --instance_id')
if (!params.accession_job_props) log.warn('Provide job-specific properties using --accession_job_props')
if (!params.taxonomy) log.warn('Provide taxonomy id using --taxonomy')
if (!params.public_ftp_dir) log.warn('Provide public FTP directory using --public_ftp_dir')
Expand Down
30 changes: 21 additions & 9 deletions eva_submission/vep_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import requests
from ebi_eva_common_pyutils.ncbi_utils import get_ncbi_assembly_dicts_from_term, \
retrieve_species_scientific_name_from_tax_id_ncbi
from requests import HTTPError
from retry import retry

from ebi_eva_common_pyutils.config import cfg
Expand Down Expand Up @@ -127,6 +128,17 @@ def get_vep_cache_version_from_ftp(assembly_accession, ensembl_assembly_name=Non
return None, None


@retry(tries=4, delay=2, backoff=1.2, jitter=(1, 3))
def resolve_ensembl_supported_assemblies(taxonomy_id):
# Now resolve the currently supported assembly for this species in Ensembl
url = f'https://rest.ensembl.org/info/genomes/taxonomy/{taxonomy_id}?content-type=application/json'
response = requests.get(url)
# This endpoint returns 500 even if taxon id is valid but just not in Ensembl, to minimise user frustration
# we'll assume the species is just not currently supported.
response.raise_for_status()
return response


@retry(tries=4, delay=2, backoff=1.2, jitter=(1, 3))
def get_species_and_assembly(assembly_acc):
"""
Expand All @@ -153,17 +165,17 @@ def get_species_and_assembly(assembly_acc):
taxonomy_id, assembly_name = taxid_and_assembly_name.pop()

# Now resolve the currently supported assembly for this species in Ensembl
url = f'https://rest.ensembl.org/info/genomes/taxonomy/{taxonomy_id}?content-type=application/json'
response = requests.get(url)
# This endpoint returns 500 even if taxon id is valid but just not in Ensembl, to minimise user frustration
# we'll assume the species is just not currently supported.
if not response.ok:
logger.warning(f'Got {response.status_code} when trying to get species and assembly from Ensembl.')
try:
response = resolve_ensembl_supported_assemblies(taxonomy_id)
except HTTPError as e:
# This endpoint returns 500 even if taxon id is valid but just not in Ensembl, to minimise user frustration
# we'll assume the species is just not currently supported.
logger.error(f'Got {e.response.status_code} when trying to get species and assembly from Ensembl with '
f'taxonomy {taxonomy_id}.')
return None, None, None, None
# Sometime ensembl responds with a 200 but still has no data
# See https://rest.ensembl.org/info/genomes/taxonomy/1010633?content-type=application/json
elif not response.json():
logger.warning(f'Ensembl return empty list when trying to get species and assembly.')
if not response.json():
logger.error(f'Ensembl return empty list when trying to get species and assembly.')
return None, None, None, None
# search through all the responses
current = False
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cached-property
cerberus
ebi-eva-common-pyutils[eva-internal]==0.6.6
ebi-eva-common-pyutils[eva-internal]==0.6.7
eva-vcf-merge==0.0.8
humanize
lxml
Expand Down
1 change: 0 additions & 1 deletion tests/nextflow-tests/run_tests_clustering.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ nextflow run ${SOURCE_DIR}/remap_and_cluster.nf -params-file test_ingestion_no_r
--extraction_properties ${SCRIPT_DIR}/template.properties \
--ingestion_properties ${SCRIPT_DIR}/template.properties \
--clustering_properties ${SCRIPT_DIR}/template.properties \
--clustering_instance 1 \
--output_dir ${SCRIPT_DIR}/output \
--logs_dir ${SCRIPT_DIR}/output/logs \
--remapping_config ${SCRIPT_DIR}/test_ingestion_config.yaml \
Expand Down
1 change: 0 additions & 1 deletion tests/nextflow-tests/run_tests_remapping_clustering.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ nextflow run ${SOURCE_DIR}/remap_and_cluster.nf -params-file test_ingestion_conf
--extraction_properties ${SCRIPT_DIR}/template.properties \
--ingestion_properties ${SCRIPT_DIR}/template.properties \
--clustering_properties ${SCRIPT_DIR}/template.properties \
--clustering_instance 1 \
--output_dir ${SCRIPT_DIR}/output \
--logs_dir ${SCRIPT_DIR}/output/logs \
--remapping_config ${SCRIPT_DIR}/test_ingestion_config.yaml \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
project_accession: PRJEB12345
taxonomy: 1234
instance_id: 1
public_ftp_dir: ../../ftp
logs_dir: ../../../project/logs
public_dir: ../../../project/public
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
project_accession: PRJEB12345
instance_id: 1
public_ftp_dir: ../../ftp
logs_dir: ../../../project/logs
public_dir: ../../../project/public
Expand Down
3 changes: 1 addition & 2 deletions tests/test_eload_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def test_ingest_all_tasks(self):
m_get_species.return_value = 'homo_sapiens'
m_get_results.side_effect = default_db_results_for_ingestion()
m_get_tax.return_value = ('name', '9090')
self.eload.ingest(1)
self.eload.ingest()

def test_ingest_metadata_load(self):
with self._patch_metadata_handle(), \
Expand Down Expand Up @@ -222,7 +222,6 @@ def test_ingest_accession(self):
m_post.return_value.text = self.get_mock_result_for_ena_date()
m_get_results.side_effect = default_db_results_for_accession()
self.eload.ingest(
instance_id=1,
tasks=['accession']
)
assert os.path.exists(
Expand Down
8 changes: 4 additions & 4 deletions tests/test_vep_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ def fake_dir(path, callback):

def test_get_vep_versions_from_ensembl(self):
vep_version, cache_version = get_vep_and_vep_cache_version_from_ensembl('GCA_000827895.1')
self.assertEqual(vep_version, 111)
self.assertEqual(cache_version, 58)
self.assertEqual(vep_version, 112)
self.assertEqual(cache_version, 59)
assert os.path.exists(os.path.join(cfg['vep_cache_path'], 'thelohanellus_kitauei'))
assert os.listdir(os.path.join(cfg['vep_cache_path'], 'thelohanellus_kitauei')) == ['58_ASM82789v1']
assert os.listdir(os.path.join(cfg['vep_cache_path'], 'thelohanellus_kitauei')) == ['59_ASM82789v1']

def test_get_vep_versions_from_ensembl_not_found(self):
vep_version, cache_version = get_vep_and_vep_cache_version_from_ensembl('GCA_015220235.1')
Expand Down Expand Up @@ -156,5 +156,5 @@ def test_get_releases(self):
assert get_releases(ftp, 'pub', current_only=True) == {112: 'pub/release-112'}

with get_ftp_connection(ensembl_genome_ftp_url) as ftp:
assert get_releases(ftp, ensembl_genome_dirs[0], current_only=True) == {58: 'ensemblgenomes/pub/plants/release-58'}
assert get_releases(ftp, ensembl_genome_dirs[0], current_only=True) == {59: 'ensemblgenomes/pub/plants/release-59'}

Loading