Skip to content

Commit

Permalink
feat: Vertica metadata extractor (amundsen-io#433)
Browse files Browse the repository at this point in the history
* Adding metadata extractor for Vertica

Adding a metadata extractor for Vertica using MySQL as template

Signed-off-by: Ashutosh Sanzgiri <sanzgiri@gmail.com>

* renaming appropriately

Signed-off-by: Ashutosh Sanzgiri <sanzgiri@gmail.com>

* reordering imports

Signed-off-by: Ashutosh Sanzgiri <sanzgiri@gmail.com>

* add docs for vertica extractor

Signed-off-by: Ashutosh Sanzgiri <sanzgiri@gmail.com>

* fix lint errors

Signed-off-by: Ashutosh Sanzgiri <sanzgiri@gmail.com>

* fix isort errors

Signed-off-by: Ashutosh Sanzgiri <sanzgiri@gmail.com>

* fix isort errors

Signed-off-by: Ashutosh Sanzgiri <sanzgiri@gmail.com>

* fix isort errors

Signed-off-by: Ashutosh Sanzgiri <sanzgiri@gmail.com>
  • Loading branch information
sanzgiri authored and Wonong committed Mar 4, 2021
1 parent f022468 commit f4cf2e6
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 0 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,15 @@ job = DefaultJob(
job.launch()
```

#### [VerticaMetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/vertica_metadata_extractor.py "MysqlMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, column name and column datatype from a Vertica database.

A sample loading script for Vertica is provided [here](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/databuilder/example/scripts/sample_vertica_loader.py)

By default, the Vertica database name is used as the cluster name. The `where_clause_suffix` in the example can be used to define which schemas you would like to query.



#### [SQLAlchemyExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/sql_alchemy_extractor.py "SQLAlchemyExtractor")
An extractor utilizes [SQLAlchemy](https://www.sqlalchemy.org/ "SQLAlchemy") to extract record from any database that support SQL Alchemy.
```python
Expand Down
140 changes: 140 additions & 0 deletions databuilder/extractor/vertica_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import logging
from collections import namedtuple
from itertools import groupby
from typing import (
Any, Dict, Iterator, Union,
)

from pyhocon import ConfigFactory, ConfigTree

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata

TableKey = namedtuple('TableKey', ['schema', 'table_name'])

LOGGER = logging.getLogger(__name__)


class VerticaMetadataExtractor(Extractor):
"""
Extracts vertica table and column metadata from underlying meta store database using SQLAlchemyExtractor
V_CATALOG does not have table and column description columns
CLUSTER_KEY config parameter is used as cluster name
Not distinguishing between table & view for now
"""
# SELECT statement from vertica information_schema to extract table and column metadata
SQL_STATEMENT = """
SELECT
lower(c.column_name) AS col_name,
lower(c.data_type) AS col_type,
c.ordinal_position AS col_sort_order,
{cluster_source} AS cluster,
lower(c.table_schema) AS "schema",
lower(c.table_name) AS name,
False AS is_view
FROM
V_CATALOG.COLUMNS AS c
LEFT JOIN
V_CATALOG.TABLES t
ON c.TABLE_NAME = t.TABLE_NAME
AND c.TABLE_SCHEMA = t.TABLE_SCHEMA
{where_clause_suffix}
ORDER by cluster, "schema", name, col_sort_order ;
"""

# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'

# Default values
DEFAULT_CLUSTER_NAME = 'master'

DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: False}
)

def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(VerticaMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(VerticaMetadataExtractor.CLUSTER_KEY))

if conf.get_bool(VerticaMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
cluster_source = "c.table_catalog"
else:
cluster_source = "'{}'".format(self._cluster)

self._database = conf.get_string(VerticaMetadataExtractor.DATABASE_KEY, default='vertica')

self.sql_stmt = VerticaMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(VerticaMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
cluster_source=cluster_source
)

self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))

self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)

LOGGER.info('SQL for vertica metadata: {}'.format(self.sql_stmt))

self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None

def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self) -> str:
return 'extractor.vertica_metadata'

def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []

for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], None,
row['col_type'], row['col_sort_order']))

yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
None,
columns,
is_view=last_row['is_view'])

def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()

def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])

return None
186 changes: 186 additions & 0 deletions example/scripts/sample_vertica_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.
"""

import os
import sys
import textwrap
import uuid

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base

from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.extractor.vertica_metadata_extractor import VerticaMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

es_host = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_HOST', 'localhost')
neo_host = os.getenv('CREDENTIALS_NEO4J_PROXY_HOST', 'localhost')

es_port = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_PORT', 9200)
neo_port = os.getenv('CREDENTIALS_NEO4J_PROXY_PORT', 7687)

if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]

es = Elasticsearch([
{'host': es_host, 'port': es_port},
])

DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()

NEO4J_ENDPOINT = 'bolt://{}:{}'.format(neo_host, neo_port)

neo4j_endpoint = NEO4J_ENDPOINT

neo4j_user = 'neo4j'
neo4j_password = 'test'


# specify vertica access credentials, host server, port (default 5433),
# database name (default 'vertica')
def connection_string():
user = 'username'
password = 'password'
host = 'vertica-budget.host'
port = '5433'
db = 'vertica'
return "vertica+vertica_python://%s:%s@%s:%s/%s" % (user, password, host, port, db)


# provide schemas to run extraction on (default 'public')
def run_vertica_job():
where_clause_suffix = textwrap.dedent("""
where c.table_schema = 'public'
""")

tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder)

job_config = ConfigFactory.from_dict({
'extractor.vertica_metadata.{}'.format(VerticaMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY):
where_clause_suffix,
'extractor.vertica_metadata.{}'.format(VerticaMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
False,
'extractor.vertica_metadata.{}'.format(VerticaMetadataExtractor.CLUSTER_KEY):
'vertica_budget',
'extractor.vertica_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
connection_string(),
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD):
neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=VerticaMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
return job


def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=None):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())

# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())

job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): model_name,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_doc_type_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias,
})

# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put('extractor.search_data.{}'.format(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY),
cypher_query)
if elasticsearch_mapping:
job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY),
elasticsearch_mapping)

job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job


if __name__ == "__main__":
# Uncomment next line to get INFO level logging
# logging.basicConfig(level=logging.INFO)

loading_job = run_vertica_job()
loading_job.launch()

job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()

0 comments on commit f4cf2e6

Please sign in to comment.