Skip to content

Commit

Permalink
feat: kafka schema registry integration (#1959)
Browse files Browse the repository at this point in the history
Signed-off-by: Farbod Ahmadian <farbodahmadian2014@gmail.com>
  • Loading branch information
farbodahm authored Aug 18, 2022
1 parent 8641bef commit ca4a048
Show file tree
Hide file tree
Showing 5 changed files with 435 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Please visit [Architecture](https://www.amundsen.io/amundsen/architecture/) for
- [Elasticsearch](https://www.elastic.co/)
- [Google BigQuery](https://cloud.google.com/bigquery)
- [IBM DB2](https://www.ibm.com/analytics/db2)
- [Kafka Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html)
- [Microsoft SQL Server](https://www.microsoft.com/en-us/sql-server/default.aspx)
- [MySQL](https://www.mysql.com/)
- [Oracle](https://www.oracle.com/index.html) (through dbapi or sql_alchemy)
Expand Down
22 changes: 22 additions & 0 deletions databuilder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,28 @@ job = DefaultJob(
job.launch()
```

#### [KafkaSchemaRegistryExtractor](https://github.com/amundsen-io/amundsen/blob/main/databuilder/databuilder/extractor/kafka_schema_registry_extractor.py "KafkaSchemaRegistryExtractor")

An extractor that extracts schema metadata Confluent Kafka Schema registry with Avro format.

A sample job config is shown below.

```python
job_config = ConfigFactory.from_dict({
f"extractor.kafka_schema_registry.{KafkaSchemaRegistryExtractor.REGISTRY_URL_KEY}": "http://localhost:8081",
f"extractor.kafka_schema_registry.{KafkaSchemaRegistryExtractor.REGISTRY_USERNAME_KEY}": "username",
f"extractor.kafka_schema_registry.{KafkaSchemaRegistryExtractor.REGISTRY_PASSWORD_KEY}": "password",
})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=KafkaSchemaRegistryExtractor(),
loader=AnyLoader()))
job.launch()
```

**Note: username and password are not mandatory. Only provide if you schema registry need authorization.**

## List of transformers

Transformers are implemented by subclassing [Transformer](https://github.com/amundsen-io/amundsen/blob/main/databuilder/databuilder/transformer/base_transformer.py#L12 "Transformer") and implementing `transform(self, record)`. A transformer can:
Expand Down
181 changes: 181 additions & 0 deletions databuilder/databuilder/extractor/kafka_schema_registry_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
from asyncio.log import logger
from typing import (
Any, Dict, Iterator, List, Optional, Union,
)

from pyhocon import ConfigTree
from schema_registry.client import Auth, SchemaRegistryClient
from schema_registry.client.utils import SchemaVersion

from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata

LOGGER = logging.getLogger(__name__)


class KafkaSchemaRegistryExtractor(Extractor):
"""
Extracts the latest version of all schemas from a given
Kafka Schema Registry URL
"""

REGISTRY_URL_KEY = "registry_url"
REGISTRY_USERNAME_KEY = "registry_username"
REGISTRY_PASSWORD_KEY = "registry_password"

def init(self, conf: ConfigTree) -> None:
self._registry_base_url = conf.get(
KafkaSchemaRegistryExtractor.REGISTRY_URL_KEY
)

self._registry_username = conf.get(
KafkaSchemaRegistryExtractor.REGISTRY_USERNAME_KEY, None
)

self._registry_password = conf.get(
KafkaSchemaRegistryExtractor.REGISTRY_PASSWORD_KEY, None
)

# Add authentication if user and password are provided
if all((self._registry_username, self._registry_password)):
self._client = SchemaRegistryClient(
url=self._registry_base_url,
auth=Auth(
username=self._registry_username,
password=self._registry_password
)
)
else:
self._client = SchemaRegistryClient(
url=self._registry_base_url,
)

self._extract_iter: Union[None, Iterator] = None

def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
except Exception as e:
logger.error(f'Failed to generate next table: {e}')
return None

def get_scope(self) -> str:
return 'extractor.kafka_schema_registry'

def _get_extract_iter(self) -> Optional[Iterator[TableMetadata]]:
"""
Return an iterator generating TableMetadata for all of the schemas.
"""
for schema_version in self._get_raw_extract_iter():
subject = schema_version.subject
schema = schema_version.schema.raw_schema
LOGGER.info((f'Subject: {subject}, '
f'Schema: {schema}'))

try:
yield KafkaSchemaRegistryExtractor._create_table(
schema=schema,
subject_name=subject,
cluster_name=schema.get(
'namespace', 'kafka-schema-registry'
),
schema_name=schema.get('name', ''),
schema_description=schema.get('doc', None),
)
except Exception as e:
logger.warning(f'Failed to generate table for {subject}: {e}')
continue

def _get_raw_extract_iter(self) -> Iterator[SchemaVersion]:
"""
Return iterator of results row from schema registry
"""
subjects = self._client.get_subjects()

LOGGER.info(f'Number of extracted subjects: {len(subjects)}')
LOGGER.info(f'Extracted subjects: {subjects}')

for subj in subjects:
subj_schema = self._client.get_schema(subj)
LOGGER.info(f'Subject <{subj}> max version: {subj_schema.version}')

yield subj_schema

@staticmethod
def _create_table(
schema: Dict[str, Any],
subject_name: str,
cluster_name: str,
schema_name: str,
schema_description: str,
) -> Optional[TableMetadata]:
"""
Create TableMetadata based on given schema and names
"""
columns: List[ColumnMetadata] = []

for i, field in enumerate(schema['fields']):
columns.append(
ColumnMetadata(
name=field['name'],
description=field.get('doc', None),
col_type=KafkaSchemaRegistryExtractor._get_property_type(
field
),
sort_order=i,
)
)

return TableMetadata(
database='kafka_schema_registry',
cluster=cluster_name,
schema=subject_name,
name=schema_name,
description=schema_description,
columns=columns,
)

@staticmethod
def _get_property_type(schema: Dict) -> str:
"""
Return type of the given schema.
It will also works for nested schema types.
"""
if 'type' not in schema:
return 'object'

if type(schema['type']) is dict:
return KafkaSchemaRegistryExtractor._get_property_type(
schema['type']
)

# If schema can have multiple types
if type(schema['type']) is list:
return '|'.join(schema['type'])

if schema['type'] == 'record':
properties = [
f"{field['name']}:"
f"{KafkaSchemaRegistryExtractor._get_property_type(field)}"
for field in schema.get('fields', {})
]
if len(properties) > 0:
if 'name' in schema:
return schema['name'] + \
':struct<' + ','.join(properties) + '>'
return 'struct<' + ','.join(properties) + '>'
return 'struct<object>'
elif schema['type'] == 'array':
items = KafkaSchemaRegistryExtractor._get_property_type(
schema.get("items", {})
)
return 'array<' + items + '>'
else:
return schema['type']
7 changes: 6 additions & 1 deletion databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@
'teradatasqlalchemy==17.0.0.0'
]

schema_registry = [
'python-schema-registry-client==2.4.0'
]

all_deps = requirements + requirements_dev + kafka + cassandra + glue + snowflake + athena + \
bigquery + jsonpath + db2 + dremio + druid + spark + feast + neptune + rds \
+ atlas + salesforce + oracle + teradata
+ atlas + salesforce + oracle + teradata + schema_registry

setup(
name='amundsen-databuilder',
Expand Down Expand Up @@ -132,6 +136,7 @@
'salesforce': salesforce,
'oracle': oracle,
'teradata': teradata,
'schema_registry': schema_registry,
},
classifiers=[
'Programming Language :: Python :: 3.7',
Expand Down
Loading

0 comments on commit ca4a048

Please sign in to comment.