Skip to content
This repository has been archived by the owner on Jan 24, 2018. It is now read-only.

speed ups to rna quant ingest #1564

Merged
merged 4 commits into from
Feb 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
#
git+git://github.com/ga4gh/ga4gh-common.git@master#egg=ga4gh_common
git+git://github.com/ga4gh/schemas.git@master#egg=ga4gh_schemas
git+git://github.com/david4096/ga4gh-client.git@unset#egg=ga4gh_client
git+git://github.com/ga4gh/ga4gh-client.git@master#egg=ga4gh_client
70 changes: 52 additions & 18 deletions ga4gh/server/repo/rnaseq2ga.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import sqlite3
import csv
import uuid

import ga4gh.server.exceptions as exceptions

Expand Down Expand Up @@ -35,7 +34,7 @@ def createTables(self):
programs TEXT,
biosample_id TEXT)''')
self._cursor.execute('''CREATE TABLE Expression (
id TEXT NOT NULL PRIMARY KEY,
id INTEGER,
rna_quantification_id TEXT,
name TEXT,
feature_id TEXT,
Expand All @@ -45,7 +44,8 @@ def createTables(self):
score REAL,
units INTEGER,
conf_low REAL,
conf_hi REAL)''')
conf_hi REAL,
PRIMARY KEY (id, rna_quantification_Id))''')
self._dbConn.commit()

def addRNAQuantification(self, datafields):
Expand Down Expand Up @@ -83,6 +83,22 @@ def batchAddExpression(self):
self._dbConn.commit()
self._expressionValueList = []

def createIndices(self):
"""
Index columns that are queried. The expression index can
take a long time.
"""

sql = '''CREATE INDEX feature_id_index
ON Expression (feature_id)'''
self._cursor.execute(sql)
self._dbConn.commit()

sql = '''CREATE INDEX expression_index
ON Expression (expression)'''
self._cursor.execute(sql)
self._dbConn.commit()


class AbstractWriter(object):
"""
Expand All @@ -109,6 +125,16 @@ def setUnits(self, units):
elif units == "tpm":
self._units = 2

def setColNum(self, header, name, defaultNum=None):
colNum = defaultNum
try:
colNum = header.index(name)
except:
if defaultNum is None:
raise exceptions.RepoManagerException(
"Missing {} column in expression table.".format(name))
return colNum

def writeExpression(self, rnaQuantificationId, quantfilename,
featureSetNames=None):
"""
Expand All @@ -124,24 +150,31 @@ def writeExpression(self, rnaQuantificationId, quantfilename,
featureSets.append(
self._dataset.getFeatureSetByName(annotationName))
with open(quantfilename, "r") as quantFile:
quantificationReader = csv.DictReader(quantFile, delimiter=b"\t")
quantificationReader = csv.reader(quantFile, delimiter=b"\t")
header = next(quantificationReader)
expressionLevelColNum = self.setColNum(
header, self._expressionLevelCol)
nameColNum = self.setColNum(header, self._nameCol)
countColNum = self.setColNum(header, self._countCol, -1)
confColLowNum = self.setColNum(header, self._confColLow, -1)
confColHiNum = self.setColNum(header, self._confColHi, -1)
featureColNum = self.setColNum(header, self._featureCol)
expressionId = 0
for expression in quantificationReader:
expressionLevel = expression[self._expressionLevelCol]
expressionId = str(uuid.uuid4())
name = expression[self._nameCol]
expressionLevel = expression[expressionLevelColNum]
name = expression[nameColNum]
rawCount = 0.0
if self._countCol in expression.keys():
rawCount = expression[self._countCol]
if countColNum != -1:
rawCount = expression[countColNum]
confidenceLow = 0.0
confidenceHi = 0.0
score = 0.0
if (self._confColLow in expression.keys() and
self._confColHi in expression.keys()):
confidenceLow = float(expression[self._confColLow])
confidenceHi = float(expression[self._confColHi])
if confColLowNum != -1 and confColHiNum != -1:
confidenceLow = float(expression[confColLowNum])
confidenceHi = float(expression[confColHiNum])
score = (confidenceLow + confidenceHi)/2

featureName = expression[self._featureCol]
featureName = expression[featureColNum]
featureId = ""
if featureSets is not None:
for featureSet in featureSets:
Expand All @@ -157,6 +190,7 @@ def writeExpression(self, rnaQuantificationId, quantfilename,
rawCount, score, units, confidenceLow,
confidenceHi)
self._db.addExpression(datafields)
expressionId += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! Do you think this same idea will work for adding expressions in parallel (ignoring sqlite)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since each file will be processed within it's own process/thread.

self._db.batchAddExpression()


Expand Down Expand Up @@ -257,8 +291,9 @@ def writeExpressionTable(writer, data, featureSetNames=None):


def rnaseq2ga(quantificationFilename, sqlFilename, localName, rnaType,
dataset=None, featureType="gene", description="", programs="",
featureSetNames="", readGroupSetNames="", biosampleId=""):
dataset=None, featureType="gene",
description="", programs="", featureSetNames="",
readGroupSetNames="", biosampleId=""):
"""
Reads RNA Quantification data in one of several formats and stores the data
in a sqlite database for use by the GA4GH reference server.
Expand Down Expand Up @@ -292,8 +327,7 @@ def rnaseq2ga(quantificationFilename, sqlFilename, localName, rnaType,
writer = KallistoWriter(rnaDB, featureType, dataset=dataset)
elif rnaType == "rsem":
writer = RsemWriter(rnaDB, featureType, dataset=dataset)
writeRnaseqTable(rnaDB, [localName], description,
featureSetIds,
writeRnaseqTable(rnaDB, [localName], description, featureSetIds,
readGroupId=readGroupIds, programs=programs,
biosampleId=biosampleId)
writeExpressionTable(
Expand Down
24 changes: 24 additions & 0 deletions tests/datadriven/test_rna_quantification.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
from __future__ import print_function
from __future__ import unicode_literals

import tempfile
import os
import shutil

import ga4gh.server.datarepo as datarepo
import ga4gh.server.repo.rnaseq2ga as rnaseq2ga
import ga4gh.server.datamodel as datamodel
import ga4gh.server.datamodel.datasets as datasets
import ga4gh.server.datamodel.references as references
Expand Down Expand Up @@ -175,3 +180,22 @@ def testSearchExpressionLevelsWithFeatureIds(self):
self.assertEqual(
_expressionTestData["num_expression_entries"],
len(expressionLevels))

def testLoadRsemData(self):
"""
Test ingest of rsem data.
"""
tempDir = tempfile.mkdtemp(prefix="ga4gh_rna_quant",
dir=tempfile.gettempdir())
dbName = os.path.join(tempDir, "rnaQuantDB")
storeDb = rnaseq2ga.RnaSqliteStore(dbName)
storeDb.createTables()

testTsvFile = os.path.join(
paths.testDataDir,
"datasets/dataset1/rnaQuant/rsem_test_data.tsv")
Copy link
Member

@david4096 david4096 Feb 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is really just guaranteeing the whole this doesn't go belly up, which is a great improvement! Also makes me think we could add it to the https://github.com/ga4gh/server/blob/master/scripts/build_test_data.py

rnaQuantId = "rqsId"
rnaseq2ga.rnaseq2ga(testTsvFile, dbName, rnaQuantId,
'rsem', featureType="gene")

shutil.rmtree(tempDir)
2 changes: 1 addition & 1 deletion tests/unit/test_oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def setUpClass(cls):
"DATA_SOURCE": "simulated://",
"OIDC_CLIENT_ID": "123",
"OIDC_CLIENT_SECRET": RANDSTR,
"OIDC_PROVIDER": "http://auth.com",
"OIDC_PROVIDER": "https://accounts.example.com",
"SECRET_KEY": "secret"
# "DEBUG" : True
}
Expand Down