Skip to content

Commit

Permalink
Add postgres compatibility in HiveTableLastUpdatedExtractor
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Chou <tcheon8788@gmail.com>
  • Loading branch information
chonyy committed Jun 8, 2022
1 parent e8fee8c commit bdc06cc
Showing 1 changed file with 54 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from pyhocon import ConfigFactory, ConfigTree
from pytz import UTC
from sqlalchemy.engine.url import make_url

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
Expand Down Expand Up @@ -58,7 +59,7 @@ class HiveTableLastUpdatedExtractor(Extractor):
timestamp.
"""
PARTITION_TABLE_SQL_STATEMENT = """
DEFAULT_PARTITION_TABLE_SQL_STATEMENT = """
SELECT
DBS.NAME as `schema`,
TBL_NAME as table_name,
Expand All @@ -71,7 +72,20 @@ class HiveTableLastUpdatedExtractor(Extractor):
ORDER BY `schema`, table_name;
"""

NON_PARTITIONED_TABLE_SQL_STATEMENT = """
DEFAULT_POSTGRES_PARTITION_TABLE_SQL_STATEMENT = """
SELECT
d."NAME" as "schema",
t."TBL_NAME" as table_name,
MAX(p."CREATE_TIME") as last_updated_time
FROM "TBLS" t
JOIN "DBS" d ON t."DB_ID" = d."DB_ID"
JOIN "PARTITIONS" p ON t."TBL_ID" = p."TBL_ID"
{where_clause_suffix}
GROUP BY "schema", table_name
ORDER BY "schema", table_name;
"""

DEFAULT_NON_PARTITIONED_TABLE_SQL_STATEMENT = """
SELECT
DBS.NAME as `schema`,
TBL_NAME as table_name,
Expand All @@ -83,11 +97,27 @@ class HiveTableLastUpdatedExtractor(Extractor):
ORDER BY `schema`, table_name;
"""

DEFAULT_POSTGRES_NON_PARTITIONED_TABLE_SQL_STATEMENT = """
SELECT
d."NAME" as "schema",
t."TBL_NAME" as table_name,
s."LOCATION" as location
FROM "TBLS" t
JOIN "DBS" d ON t."DB_ID" = d."DB_ID"
JOIN "SDS" s ON t."SD_ID" = s."SD_ID"
{where_clause_suffix}
ORDER BY "schema", table_name;
"""

# Additional where clause for non partitioned table SQL
ADDTIONAL_WHERE_CLAUSE = """ NOT EXISTS (SELECT * FROM PARTITIONS WHERE PARTITIONS.TBL_ID = TBLS.TBL_ID)
DEFAULT_ADDTIONAL_WHERE_CLAUSE = """ NOT EXISTS (SELECT * FROM PARTITIONS WHERE PARTITIONS.TBL_ID = TBLS.TBL_ID)
AND NOT EXISTS (SELECT * FROM PARTITION_KEYS WHERE PARTITION_KEYS.TBL_ID = TBLS.TBL_ID)
"""

DEFAULT_POSTGRES_ADDTIONAL_WHERE_CLAUSE = """ NOT EXISTS (SELECT * FROM "PARTITIONS" p WHERE p."TBL_ID" = t."TBL_ID")
AND NOT EXISTS (SELECT * FROM "PARTITION_KEYS" pk WHERE pk."TBL_ID" = t."TBL_ID")
"""

DATABASE = 'hive'

# CONFIG KEYS
Expand Down Expand Up @@ -132,7 +162,7 @@ def _get_partitioned_table_sql_alchemy_extractor(self) -> Extractor:
:return: SQLAlchemyExtractor
"""

sql_stmt = self.PARTITION_TABLE_SQL_STATEMENT.format(
sql_stmt = self._choose_default_partitioned_sql_stm().format(
where_clause_suffix=self._conf.get_string(
self.PARTITIONED_TABLE_WHERE_CLAUSE_SUFFIX_KEY, ' '))

Expand All @@ -144,24 +174,34 @@ def _get_partitioned_table_sql_alchemy_extractor(self) -> Extractor:
sql_alchemy_extractor.init(sql_alchemy_conf)
return sql_alchemy_extractor

def _choose_default_partitioned_sql_stm(self) -> str:
url = make_url(self._conf.get_string("extractor.sqlalchemy.conn_string"))
if url.drivername.lower() in ['postgresql', 'postgres']:
return self.DEFAULT_POSTGRES_PARTITION_TABLE_SQL_STATEMENT
else:
return self.DEFAULT_PARTITION_TABLE_SQL_STATEMENT

def _get_non_partitioned_table_sql_alchemy_extractor(self) -> Extractor:
"""
Getting an SQLAlchemy extractor that extracts storage location for non-partitioned table for further probing
last updated timestamp
:return: SQLAlchemyExtractor
"""

default_sql_stmt, default_additional_where_clause = self._choose_default_non_partitioned_sql_stm()

if self.NON_PARTITIONED_TABLE_WHERE_CLAUSE_SUFFIX_KEY in self._conf:
where_clause_suffix = """
{}
AND {}
""".format(self._conf.get_string(
self.NON_PARTITIONED_TABLE_WHERE_CLAUSE_SUFFIX_KEY),
self.ADDTIONAL_WHERE_CLAUSE)
default_additional_where_clause)
else:
where_clause_suffix = 'WHERE {}'.format(self.ADDTIONAL_WHERE_CLAUSE)
where_clause_suffix = 'WHERE {}'.format(default_additional_where_clause)

sql_stmt = self.NON_PARTITIONED_TABLE_SQL_STATEMENT.format(
sql_stmt = default_sql_stmt.format(
where_clause_suffix=where_clause_suffix)

LOGGER.info('SQL for non-partitioned table against Hive metastore: %s', sql_stmt)
Expand All @@ -172,6 +212,13 @@ def _get_non_partitioned_table_sql_alchemy_extractor(self) -> Extractor:
sql_alchemy_extractor.init(sql_alchemy_conf)
return sql_alchemy_extractor

def _choose_default_non_partitioned_sql_stm(self) -> List[str]:
url = make_url(self._conf.get_string("extractor.sqlalchemy.conn_string"))
if url.drivername.lower() in ['postgresql', 'postgres']:
return [self.DEFAULT_POSTGRES_NON_PARTITIONED_TABLE_SQL_STATEMENT, self.DEFAULT_POSTGRES_ADDTIONAL_WHERE_CLAUSE]
else:
return [self.DEFAULT_NON_PARTITIONED_TABLE_SQL_STATEMENT, self.DEFAULT_ADDTIONAL_WHERE_CLAUSE]

def _get_filesystem(self) -> FileSystem:
fs = FileSystem()
fs.init(Scoped.get_scoped_conf(self._conf, fs.get_scope()))
Expand Down

0 comments on commit bdc06cc

Please sign in to comment.