diff --git a/metadb/alembic/versions/4f93dc7aa8e8_create_field_set_table.py b/metadb/alembic/versions/4f93dc7aa8e8_create_field_set_table.py new file mode 100644 index 0000000..eeecfb7 --- /dev/null +++ b/metadb/alembic/versions/4f93dc7aa8e8_create_field_set_table.py @@ -0,0 +1,55 @@ +"""create field_set table + +Revision ID: 4f93dc7aa8e8 +Revises: 4631177f4ecc +Create Date: 2017-05-25 11:22:32.344826 + +""" + +# revision identifiers, used by Alembic. +revision = '4f93dc7aa8e8' +down_revision = '4631177f4ecc' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + # making ontology terms strings for now + # leaving out externalId, diseases, pheno, etc. mappings for now + op.drop_table('field') + op.create_table( + 'field_set', + sa.Column('id', sa.BigInteger, primary_key=True), + sa.Column('guid', sa.String(36), nullable=False, unique=True), + sa.Column('description', sa.Text) + ) + op.create_table( + 'field', + sa.Column('id', sa.BigInteger, primary_key=True), + sa.Column('guid', sa.String(36), nullable=False, unique=True), + sa.Column('field', sa.String(32), nullable=False), + sa.Column('field_set_id', sa.BigInteger, sa.ForeignKey('field_set.id'), nullable=False), + sa.Column('type', sa.Enum('Integer', 'String', 'Float', 'Flag', name='type_enum')), + sa.Column('is_filter', sa.Boolean, nullable=False), + sa.Column('is_format', sa.Boolean, nullable=False), + sa.Column('is_info', sa.Boolean, nullable=False), + sa.Column('length_type', sa.Enum('A', 'R', 'G', 'VAR', 'NUM', name='length_enum')), + sa.Column('length_intval', sa.Integer, default=0, server_default=sa.text('1')), + sa.Column('field_combine_op', sa.Enum('sum', 'mean', 'median', 'move_to_FORMAT', 'element_wise_sum', 'concatenate', name='field_combine_optype')) + ) + op.create_unique_constraint('unique_name_per_field_set_constraint', 'field', ['field_set_id', 'field']) + op.add_column(u'db_array', sa.Column('field_set_id', sa.BigInteger, sa.ForeignKey('field_set.id'), nullable=False)) + +def downgrade(): + op.drop_constraint('unique_name_per_reference_set_constraint', 'field', type_='unique') + op.drop_table('field') + op.drop_table('field_set') + op.create_table( + 'field', + sa.Column('id', sa.BigInteger, primary_key=True), + sa.Column('field', sa.Text, nullable=False) + ) + op.drop_column(u'db_array', 'field_set_id') diff --git a/metadb/api/dbimport.py b/metadb/api/dbimport.py index e190f9e..16c08b7 100644 --- a/metadb/api/dbimport.py +++ b/metadb/api/dbimport.py @@ -31,6 +31,8 @@ from metadb.models import Individual from metadb.models import Reference from metadb.models import ReferenceSet +from metadb.models import Field +from metadb.models import FieldSet from metadb.models import Sample from metadb.models import VariantSet from metadb.models import Workspace @@ -62,6 +64,7 @@ class Import(): def __init__(self, db): self.db = db + self.length_type_map = {-1:'A', -2:'G', -3:'R', None:'VAR'} def __enter__(self): self.session = self.db.Session() @@ -134,6 +137,94 @@ def registerReference(self, guid, reference_set_id, name, length): return reference + def registerFieldSet(self, guid, reader=None, description=None): + fieldSet = self.session.query(FieldSet).filter(FieldSet.guid == guid).first() + if (fieldSet is not None): + return fieldSet + fieldMap = {} + + try: + fieldSet = FieldSet(guid=guid, description=description) + self.session.add(fieldSet) + self.session.commit() + + except exc.DataError as e: + self.session.rollback() + raise ValueError("{0} : {1} ".format(str(e), guid)) + + if (reader is not None): + if ((reader.filters is not None) and (len(reader.filters) > 0)): + for filter_name, filter_item in reader.filters.items(): + item_list = [] + item_list.append(filter_item) + fieldMap[filter_name] = item_list + + if ((reader.formats is not None) and (len(reader.formats) > 0)): + for format_name, format_item in reader.formats.items(): + if (format_name in fieldMap): + fieldMap[format_name].append(format_item) + else: + item_list = [] + item_list.append(format_item) + fieldMap[format_name] = item_list + + if ((reader.infos is not None) and (len(reader.infos) > 0)): + for info_name, info_item in reader.infos.items(): + if (info_name in fieldMap): + fieldMap[info_name].append(info_item) + else: + item_list = [] + item_list.append(info_item) + fieldMap[info_name] = item_list + + if (len(fieldMap) > 0): + for field_name, item_list in fieldMap.items(): + self.registerField(str(uuid.uuid4()), fieldSet.id, field_name, item_list) + + return fieldSet + + def registerField(self, guid, field_set_id, field_name, field_item_list): + field = self.session.query(Field).filter( + and_(Field.field_set_id == field_set_id, Field.field == field_name)).first() + + if field is None: + try: + field = Field() + field.is_filter = False + field.is_format = False + field.is_info = False + field.guid = guid + field.field = field_name + field.field_set_id = field_set_id + + for field_item in field_item_list: + if ('type' in field_item.__dict__.keys()): + field.type = field_item.type + if ('Filter' in str(type(field_item))): + field.is_filter = True + if ('Format' in str(type(field_item))): + field.is_format = True + if ('Info' in str(type(field_item))): + field.is_info = True + if ('num' in field_item.__dict__.keys()): + number = field_item.num + if ((None == number) or (number < 0)): + field.length_type = self.length_type_map[number] + else: + field.length_type = 'NUM' + field.length_intval = number + if ('VCF_field_combine_operation' in field_item.__dict__.keys()): + field.field_combine_op = field_item.VCF_field_combine_operation + + self.session.add(field) + self.session.commit() + + except exc.DataError as e: + self.session.rollback() + raise ValueError("{0} : {1} ".format(str(e), guid)) + + return field + def registerWorkspace(self, guid, name): """ Registers a workspace. @@ -164,7 +255,32 @@ def registerWorkspace(self, guid, name): return workspace - def registerDBArray(self, guid, reference_set_id, workspace_id, name): + def queryDBArray(self, workspace_name, dbarray_name): + dba = None + vs = None + rs = None + fs = None + try: + workspace = self.session.query(Workspace).filter(Workspace.name == workspace_name).first() + if workspace is not None: + wsid = workspace.id + dba = self.session.query(DBArray).filter( + and_(DBArray.workspace_id == wsid,\ + DBArray.name == dbarray_name))\ + .first() + if dba is not None: + dbid = dba.id + rsid = dba.reference_set_id + fsid = dba.field_set_id + rs = self.session.query(ReferenceSet).filter(ReferenceSet.id == rsid).first() + fs = self.session.query(FieldSet).filter(FieldSet.id == fsid).first() + vs = self.session.query(VariantSet).filter(VariantSet.reference_set_id == rsid).first() + except exc.DataError as e: + self.session.rollback() + raise ValueError("{0} : {1} : {2} ".format(str(e), guid, workspace_name, dbarray_name)) + return dba, vs, rs, fs + + def registerDBArray(self, guid, reference_set_id, field_set_id, workspace_id, name): """ Registers a DBArray. An array is unique named folder in a unique workspace path and a given reference id. @@ -175,6 +291,7 @@ def registerDBArray(self, guid, reference_set_id, workspace_id, name): dbarray = self.session.query(DBArray) .filter( and_(DBArray.reference_set_id == reference_set_id,\ DBArray.workspace_id == workspace_id,\ + DBArray.field_set_id == field_set_id,\ DBArray.name == name))\ .first() @@ -183,6 +300,7 @@ def registerDBArray(self, guid, reference_set_id, workspace_id, name): dbarray = DBArray( guid=guid, reference_set_id=reference_set_id, + field_set_id=field_set_id, workspace_id=workspace_id, name=name ) diff --git a/metadb/api/query.py b/metadb/api/query.py index 9f14386..8a135f8 100644 --- a/metadb/api/query.py +++ b/metadb/api/query.py @@ -28,6 +28,7 @@ from itertools import chain from metadb.models import Field +from metadb.models import FieldSet from metadb.models import Individual from metadb.models import Workspace from metadb.models import DBArray diff --git a/metadb/api/test/test_dbimport.py b/metadb/api/test/test_dbimport.py index 2387341..932ed31 100644 --- a/metadb/api/test/test_dbimport.py +++ b/metadb/api/test/test_dbimport.py @@ -292,20 +292,20 @@ def test_registerDBArray(self): # new array result = session.registerDBArray( - aguid, self.referenceset.id, self.workspace.id, self.array) + aguid, self.referenceset.id, self.fieldset.id, self.workspace.id, self.array) assert result.name == self.array assert result.guid == aguid # registered array reg_result = session.registerDBArray( - str(uuid.uuid4()), self.referenceset.id, self.workspace.id, self.array) + str(uuid.uuid4()), self.referenceset.id, self.fieldset.id, self.workspace.id, self.array) assert reg_result.name == self.array assert reg_result.guid == aguid # negative with pytest.raises(ValueError) as exec_info: neg_result = session.registerDBArray( - fguid, self.referenceset.id, self.workspace.id, "negative") + fguid, self.referenceset.id, self.fieldset.id, self.workspace.id, "negative") assert "DataError" in str(exec_info.value) def test_registerSample(self): @@ -405,9 +405,9 @@ def setUpClass(self): self.workspace = session.registerWorkspace( str(uuid.uuid4()), "/test/dbimport/workspace") self.array = session.registerDBArray( - str(uuid.uuid4()), self.referenceset.id, self.workspace.id, "test") + str(uuid.uuid4()), self.referenceset.id, self.fieldset.id, self.workspace.id, "test") self.array2 = session.registerDBArray( - str(uuid.uuid4()), self.referenceset.id, self.workspace.id, "test2") + str(uuid.uuid4()), self.referenceset.id, self.fieldset.id, self.workspace.id, "test2") self.variantset = session.registerVariantSet( str(uuid.uuid4()), self.referenceset.id, "Dataset") diff --git a/metadb/models/__init__.py b/metadb/models/__init__.py index e873290..4f36c4c 100644 --- a/metadb/models/__init__.py +++ b/metadb/models/__init__.py @@ -42,6 +42,7 @@ def get_tiledb_padded_reference_length_string_default(reference_length_str): from .inc_counter import BigInteger from .inc_counter import autoinc_handler from .field import Field +from .field_set import FieldSet from .reference_set import ReferenceSet from .reference import Reference from .source_accession import SourceAccession diff --git a/metadb/models/db_array.py b/metadb/models/db_array.py index 6b3a8b5..29e5cc4 100644 --- a/metadb/models/db_array.py +++ b/metadb/models/db_array.py @@ -31,6 +31,7 @@ class DBArray(_Base): id = sa.Column(BigInteger, primary_key=True) guid = sa.Column(sa.String(36), nullable=False, unique=True) reference_set_id = sa.Column(BigInteger, sa.ForeignKey('reference_set.id'), nullable=False) + field_set_id = sa.Column(BigInteger, sa.ForeignKey('field_set.id'), nullable=False) workspace_id = sa.Column(BigInteger, sa.ForeignKey('workspace.id'), nullable=False) # num_rows that exist in a given array - must be incremented after a new row is added # When creating a new array, by default no rows exist - hence, num_rows=0 diff --git a/metadb/models/field.py b/metadb/models/field.py index 18d6fca..df20a75 100644 --- a/metadb/models/field.py +++ b/metadb/models/field.py @@ -22,9 +22,23 @@ from ..models import _Base, BigInteger import sqlalchemy as sa - +import enum class Field(_Base): __tablename__ = "field" - id = sa.Column(BigInteger, primary_key=True) - field = sa.Column(sa.Text, nullable=False) + id = sa.Column(sa.BigInteger, primary_key=True) + guid = sa.Column(sa.String(36), nullable=False, unique=True) + field = sa.Column(sa.String(32), nullable=False) + field_set_id = sa.Column(BigInteger, sa.ForeignKey('field_set.id'), nullable=False) + # Unique constraint on (field_set_id, name) + __table_args__ = ( + sa.UniqueConstraint('field_set_id', 'field', + name='unique_name_per_field_set_constraint'), + ) + type = sa.Column(sa.Enum('Integer', 'String', 'Float', 'Flag', name='type_enum')) + is_filter = sa.Column(sa.Boolean, nullable=False) + is_format = sa.Column(sa.Boolean, nullable=False) + is_info = sa.Column(sa.Boolean, nullable=False) + length_type = sa.Column(sa.Enum('A', 'R', 'G', 'VAR', 'NUM', name='length_enum')) + length_intval = sa.Column(sa.Integer, default=1, server_default=sa.text('1')) + field_combine_op = sa.Column(sa.Enum('sum', 'mean', 'median', 'move_to_FORMAT', 'element_wise_sum', 'concatenate', name='field_combine_optype')) diff --git a/metadb/models/field_set.py b/metadb/models/field_set.py new file mode 100644 index 0000000..7081c66 --- /dev/null +++ b/metadb/models/field_set.py @@ -0,0 +1,34 @@ +""" + The MIT License (MIT) + Copyright (c) 2016 Intel Corporation + + Permission is hereby granted, free of charge, to any person obtaining a copy of + this software and associated documentation files (the "Software"), to deal in + the Software without restriction, including without limitation the rights to + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + the Software, and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +""" + +from ..models import _Base, BigInteger +import sqlalchemy as sa +from sqlalchemy.orm import relationship, backref + + +class FieldSet(_Base): + __tablename__ = "field_set" + id = sa.Column(BigInteger, primary_key=True) + guid = sa.Column(sa.String(36), nullable=False, unique=True) + description = sa.Column(sa.Text) + arrays = relationship('DBArray', backref='field_set') + fields = relationship('Field', backref='field_set') diff --git a/requirements.txt b/requirements.txt index f8c4a23..23b30af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,5 @@ alembic==0.8.2 psycopg2==2.6.1 PyVCF==0.6.8 pysam==0.9.0 +sqlalchemy_schemadisplay==1.3 +enum34==1.1.6 diff --git a/utils/helper.py b/utils/helper.py index 9baa4fc..96d5a37 100644 --- a/utils/helper.py +++ b/utils/helper.py @@ -150,10 +150,11 @@ def writeVIDMappingFile(DB_URI, reference_set_id, output_file, fields_dict=Const writeJSON2File(vid_mapping, output_file) -def registerWithMetadb(config, vcf=False, references=OrderedDict()): +def registerWithMetadb(config, reader=None, vcf=False): """ Registers parent object of a callset in metadb for both MAF and VCF importing. """ + if not vcf: # set MAF specific vars with open(config.TileDBAssembly) as config_file: @@ -174,24 +175,29 @@ def registerWithMetadb(config, vcf=False, references=OrderedDict()): dbimport = DBImport(config['dburi']) with dbimport.getSession() as metadb: - # register workspace, referenceset, array, and variantset - ws = metadb.registerWorkspace( - str(uuid.uuid4()), workspace) - rs = metadb.registerReferenceSet( - str(uuid.uuid4()), - assembly, - references=references) - dba = metadb.registerDBArray( - guid=str(uuid.uuid4()), - name=array, - reference_set_id=rs.id, - workspace_id=ws.id) - vs = metadb.registerVariantSet( - guid=str(uuid.uuid4()), - reference_set_id=rs.id, - dataset_id=os.path.basename(workspace)) - - return dba, vs, rs + dba, vs, rs, fs = metadb.queryDBArray(workspace, array) + if dba is None: + # register workspace, referenceset, array, and variantset + ws = metadb.registerWorkspace( + str(uuid.uuid4()), workspace) + rs = metadb.registerReferenceSet( + str(uuid.uuid4()), + assembly, + references=((reader is not None) ? reader.contigs : None) + fs = metadb.registerFieldSet( + str(uuid.uuid4()), reader, assembly) + dba = metadb.registerDBArray( + guid=str(uuid.uuid4()), + name=array, + reference_set_id=rs.id, + field_set_id=fs.id, + workspace_id=ws.id) + vs = metadb.registerVariantSet( + guid=str(uuid.uuid4()), + reference_set_id=rs.id, + dataset_id=os.path.basename(workspace)) + + return dba, vs, rs, fs def createMappingFiles(outputDir, callset_mapping, rs_id, DB_URI, array, loader_config=None): diff --git a/utils/vcf_importer.py b/utils/vcf_importer.py index f2f358b..ce2c180 100644 --- a/utils/vcf_importer.py +++ b/utils/vcf_importer.py @@ -81,9 +81,7 @@ def __enter__(self): self.source_idx = conf.get('source_idx', 0) self.target_idx = conf.get('target_idx', 1) - self.array, self.variantset, self.referenceset = helper.registerWithMetadb( - conf, vcf=True, references=self.reader.contigs) - + self.array, self.variantset, self.referenceset, self.fieldset = helper.registerWithMetadb(conf, self.reader, vcf=True) return self def __exit__(self, exc_type, exc_value, traceback): @@ -260,7 +258,6 @@ def poolImportVCF(file_info): return (-1, inputFile) return (0, inputFile, vc.callset_mapping) - def parallelGen(config_file, inputFileList, outputDir, callset_file=None, loader_config=None): """ Spawns the Pool of VCF objects to work on each input VCF @@ -273,7 +270,7 @@ def parallelGen(config_file, inputFileList, outputDir, callset_file=None, loader # if no contig information in header, assume it's already registered with open(inputFileList[0], 'rb') as vcf_init: reader = vcf.Reader(vcf_init) - dba, vs, rs = helper.registerWithMetadb(config, vcf=True, references=reader.contigs) + dba, vs, rs, fs = helper.registerWithMetadb(config, reader, vcf=True) # sort and index vcfs, and # build arguments for parallel gen @@ -296,6 +293,7 @@ def parallelGen(config_file, inputFileList, outputDir, callset_file=None, loader pool = Pool() failed = list() + for returncode in pool.imap_unordered(poolImportVCF, function_args): if returncode[0] == -1: failed.append(returncode[1])