Skip to content

Commit

Permalink
Source Firebolt: allow reading from views (#16583)
Browse files Browse the repository at this point in the history
* Source Firebolt: allow reading from views

* setting cdk version

* Decimal is now enabled

* Bump sdk version to 0.12

* fix integration tests

* Disabling backwards compatibility test

* address comments:
- revert .pre-commit-config.yaml
- update versions manually under the seed file
- update the doc to reflect the PR link

* fix docs and bump source_specs.yaml

Co-authored-by: Yiyang Li <yiyangli2010@gmail.com>
  • Loading branch information
ptiurin and YiyangLi authored Dec 7, 2022
1 parent 0164003 commit 7b0aecb
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1945,7 +1945,7 @@
- name: Firebolt
sourceDefinitionId: 6f2ac653-8623-43c4-8950-19218c7caf3d
dockerRepository: airbyte/source-firebolt
dockerImageTag: 0.1.0
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.com/integrations/sources/firebolt
sourceType: database
releaseStage: alpha
Expand Down
5 changes: 3 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16876,7 +16876,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-firebolt:0.1.0"
- dockerImage: "airbyte/source-firebolt:0.2.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/firebolt"
connectionSpecification:
Expand All @@ -16887,7 +16887,7 @@
- "username"
- "password"
- "database"
additionalProperties: false
additionalProperties: true
properties:
username:
type: "string"
Expand All @@ -16899,6 +16899,7 @@
type: "string"
title: "Password"
description: "Firebolt password."
airbyte_secret: true
account:
type: "string"
title: "Account"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-firebolt/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ COPY source_firebolt ./source_firebolt
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/source-firebolt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ tests:
status: "failed"
discovery:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
# 0.1.0 contains queries that overwhelm the API server on this test
disable_for_version: "0.1.0"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ This connector uses [firebolt-sdk](https://pypi.org/project/firebolt-sdk/), whic
## Notes

* External tables are not available as a source for performance reasons.
* Views are not available as a source due to possible complicated structure and non-obvious data types.
* Only Full reads are supported for now.
* Integration/Acceptance testing requires the user to have a running engine. Spinning up an engine can take a while so this ensures a faster iteration on the connector.
* Pagination is not available at the moment so large enough data sets might cause out of memory errors
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,23 @@ def test_table_name() -> str:


@fixture(scope="module")
def create_test_data(config: Dict[str, str], test_table_name: str) -> Generator[Connection, None, None]:
def test_view_name(test_table_name) -> str:
return f"view_{test_table_name}"


@fixture(scope="module")
def create_test_data(config: Dict[str, str], test_table_name: str, test_view_name: str) -> Generator[Connection, None, None]:
with establish_connection(config, MagicMock()) as connection:
with connection.cursor() as cursor:
cursor.execute(
f"CREATE DIMENSION TABLE {test_table_name} (column1 STRING NULL, column2 INT NULL, column3 DATE NULL, column4 DATETIME NULL, column5 DECIMAL(38, 31) NULL, column6 ARRAY(INT), column7 BOOLEAN NULL)"
)
cursor.execute(
f"INSERT INTO {test_table_name} VALUES ('my_value',221,'2021-01-01','2021-01-01 12:00:01', Null, [1,2,3], true), ('my_value2',null,'2021-01-02','2021-01-02 12:00:02','1231232.123459999990457054844258706536', [1,2,3], null)"
f"INSERT INTO {test_table_name} VALUES ('my_value',221,'2021-01-01','2021-01-01 12:00:01', Null, [1,2,3], true), ('my_value2',null,'2021-01-02','2021-01-02 12:00:02','1231232.1234599999904570548442587065362', [1,2,3], null)"
)
cursor.execute(f"CREATE VIEW {test_view_name} AS SELECT column1, column2 FROM {test_table_name}")
yield connection
cursor.execute(f"DROP TABLE {test_table_name}")
cursor.execute(f"DROP TABLE {test_table_name} CASCADE")


@fixture
Expand All @@ -55,7 +61,9 @@ def table_schema() -> str:
"format": "datetime",
"airbyte_type": "timestamp_without_timezone",
},
"column5": {"type": ["null", "number"]}, # TODO: change once Decimal hits production
# If column check fails you mignt not have the latest Firebolt version
# with Decimal data type enabled
"column5": {"type": ["null", "string"], "airbyte_type": "big_number"},
"column6": {"type": "array", "items": {"type": ["null", "integer"]}},
"column7": {"type": ["null", "integer"]},
},
Expand All @@ -69,11 +77,38 @@ def test_stream(test_table_name: str, table_schema: str) -> AirbyteStream:


@fixture
def test_configured_catalogue(test_table_name: str, table_schema: str) -> ConfiguredAirbyteCatalog:
def view_schema() -> str:
schema = {
"type": "object",
"properties": {
"column1": {"type": ["null", "string"]},
"column2": {"type": ["null", "integer"]},
},
}
return schema


@fixture
def test_view_stream(test_view_name: str, view_schema: str) -> AirbyteStream:
return AirbyteStream(name=test_view_name, json_schema=view_schema, supported_sync_modes=[SyncMode.full_refresh])


@fixture
def configured_catalogue(test_table_name: str, table_schema: str) -> ConfiguredAirbyteCatalog:
# Deleting one column to simulate manipulation in UI
del table_schema["properties"]["column1"]
append_stream = ConfiguredAirbyteStream(
stream=AirbyteStream(name=test_table_name, json_schema=table_schema),
stream=AirbyteStream(name=test_table_name, json_schema=table_schema, supported_sync_modes=[SyncMode.full_refresh]),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.append,
)
return ConfiguredAirbyteCatalog(streams=[append_stream])


@fixture
def configured_view_catalogue(test_view_name: str, view_schema: str) -> ConfiguredAirbyteCatalog:
append_stream = ConfiguredAirbyteStream(
stream=AirbyteStream(name=test_view_name, json_schema=view_schema, supported_sync_modes=[SyncMode.full_refresh]),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.append,
)
Expand Down Expand Up @@ -109,33 +144,62 @@ def test_check_succeeds(config: Dict[str, str]):


def test_discover(
config: Dict[str, str], create_test_data: Generator[Connection, None, None], test_table_name: str, test_stream: AirbyteStream
config: Dict[str, str],
create_test_data: Generator[Connection, None, None],
test_table_name: str,
test_view_name: str,
test_stream: AirbyteStream,
test_view_stream: AirbyteStream,
):
source = SourceFirebolt()
catalog = source.discover(MagicMock(), config)
assert any(stream.name == test_table_name for stream in catalog.streams), "Test table not found"
assert any(stream.name == test_view_name for stream in catalog.streams), "Test view not found"
for stream in catalog.streams:
if stream.name == test_table_name:
assert stream == test_stream
if stream.name == test_view_name:
assert stream == test_view_stream


def test_read(
config: Dict[str, str],
create_test_data: Generator[Connection, None, None],
test_table_name: str,
test_configured_catalogue: ConfiguredAirbyteCatalog,
configured_catalogue: ConfiguredAirbyteCatalog,
):
expected_data = [
{"column2": 221, "column3": "2021-01-01", "column4": "2021-01-01T12:00:01", "column6": [1, 2, 3], "column7": 1},
{
"column3": "2021-01-02",
"column4": "2021-01-02T12:00:02",
"column5": 1231232.12346, # TODO: change once Decimal is in production
# If column5 check fails you mignt not have the latest Firebolt version
# with Decimal data type enabled
"column5": "1231232.1234599999904570548442587065362",
"column6": [1, 2, 3],
},
]
source = SourceFirebolt()
result = source.read(logger=MagicMock(), config=config, catalog=test_configured_catalogue, state={})
result = source.read(logger=MagicMock(), config=config, catalog=configured_catalogue, state={})
data = list(result)
assert all([x.record.stream == test_table_name for x in data]), "Table name is incorrect"
assert [x.record.data for x in data] == expected_data, "Test data is not matching"


def test_view_read(
config: Dict[str, str],
create_test_data: Generator[Connection, None, None],
test_view_name: str,
configured_view_catalogue: ConfiguredAirbyteCatalog,
):
expected_data = [
{"column1": "my_value", "column2": 221},
{
"column1": "my_value2",
},
]
source = SourceFirebolt()
result = source.read(logger=MagicMock(), config=config, catalog=configured_view_catalogue, state={})
data = list(result)
assert all([x.record.stream == test_view_name for x in data]), "Table name is incorrect"
assert [x.record.data for x in data] == expected_data, "Test data is not matching"
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-firebolt/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk", "firebolt-sdk>=0.8.0"]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "firebolt-sdk>=0.12.0"]

TEST_REQUIREMENTS = [
"pytest>=6.2.5", # 6.2.5 has python10 compatibility fixes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@


import json
from typing import Any, Dict, List
from collections import defaultdict
from typing import Any, Dict, List, Tuple

from airbyte_cdk.logger import AirbyteLogger
from firebolt.async_db import Connection as AsyncConnection
Expand Down Expand Up @@ -72,23 +73,21 @@ async def establish_async_connection(config: json, logger: AirbyteLogger) -> Asy
return connection


async def get_firebolt_tables(connection: AsyncConnection) -> List[str]:
def get_table_structure(connection: Connection) -> Dict[str, List[Tuple]]:
"""
Fetch a list of tables that are compatible with Airbyte.
Currently this includes Fact and Dimension tables
Get columns and their types for all the tables and views in the database.
:param connection: Connection object connected to a database
:return: List of table names
"""
query = """
SELECT
table_name
FROM
information_schema.tables
WHERE
"table_type" IN ('FACT', 'DIMENSION')
:return: Dictionary containing column list of each table
"""
column_mapping = defaultdict(list)
cursor = connection.cursor()
await cursor.execute(query)
return [table[0] for table in await cursor.fetchall()]
cursor.execute(
"SELECT table_name, column_name, data_type, is_nullable FROM information_schema.columns "
"WHERE table_name NOT IN (SELECT table_name FROM information_schema.tables WHERE table_type IN ('EXTERNAL', 'CATALOG'))"
)
for t_name, c_name, c_type, nullable in cursor.fetchall():
column_mapping[t_name].append((c_name, c_type, nullable))
cursor.close()
return column_mapping
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import json
from asyncio import gather, get_event_loop
from typing import Dict, Generator

from airbyte_cdk.logger import AirbyteLogger
Expand All @@ -17,36 +16,13 @@
SyncMode,
)
from airbyte_cdk.sources import Source
from firebolt.async_db import Connection as AsyncConnection

from .database import establish_async_connection, establish_connection, get_firebolt_tables
from .database import establish_connection, get_table_structure
from .utils import airbyte_message_from_data, convert_type

SUPPORTED_SYNC_MODES = [SyncMode.full_refresh]


async def get_table_stream(connection: AsyncConnection, table: str) -> AirbyteStream:
"""
Get AirbyteStream for a particular table with table structure defined.
:param connection: Connection object connected to a database
:return: AirbyteStream object containing the table structure
"""
column_mapping = {}
cursor = connection.cursor()
await cursor.execute(f"SHOW COLUMNS {table}")
for t_name, c_name, c_type, nullable in await cursor.fetchall():
airbyte_type = convert_type(c_type, nullable)
column_mapping[c_name] = airbyte_type
cursor.close()
json_schema = {
"type": "object",
"properties": column_mapping,
}
return AirbyteStream(name=table, json_schema=json_schema, supported_sync_modes=SUPPORTED_SYNC_MODES)


class SourceFirebolt(Source):
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
"""
Expand Down Expand Up @@ -87,14 +63,17 @@ def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
by their names and types)
"""

async def get_streams():
async with await establish_async_connection(config, logger) as connection:
tables = await get_firebolt_tables(connection)
logger.info(f"Found {len(tables)} available tables.")
return await gather(*[get_table_stream(connection, table) for table in tables])

loop = get_event_loop()
streams = loop.run_until_complete(get_streams())
with establish_connection(config, logger) as connection:
structure = get_table_structure(connection)

streams = []
for table, columns in structure.items():
column_mapping = {c_name: convert_type(c_type, nullable) for c_name, c_type, nullable in columns}
json_schema = {
"type": "object",
"properties": column_mapping,
}
streams.append(AirbyteStream(name=table, json_schema=json_schema, supported_sync_modes=SUPPORTED_SYNC_MODES))
logger.info(f"Provided {len(streams)} streams to the Aribyte Catalog.")
return AirbyteCatalog(streams=streams)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"title": "Firebolt Spec",
"type": "object",
"required": ["username", "password", "database"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"username": {
"type": "string",
Expand All @@ -16,7 +16,8 @@
"password": {
"type": "string",
"title": "Password",
"description": "Firebolt password."
"description": "Firebolt password.",
"airbyte_secret": true
},
"account": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def convert_type(fb_type: str, nullable: bool) -> Dict[str, Union[str, Dict]]:
airbyte_type = convert_type(inner_type, nullable=True)
result = {"type": "array", "items": airbyte_type}
else:
# Strip complex type info e.g. DECIMAL(8,23) -> DECIMAL
fb_type = fb_type[: fb_type.find("(")] if "(" in fb_type else fb_type
# Remove NULL/NOT NULL from child type of an array e.g. ARRAY(INT NOT NULL)
fb_type = fb_type.removesuffix(" NOT NULL").removesuffix(" NULL")
result = map.get(fb_type.upper(), {"type": "string"})
if nullable:
result["type"] = ["null", result["type"]]
Expand Down
Loading

0 comments on commit 7b0aecb

Please sign in to comment.