Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 🎉 Source Airtable: migrate to the Metadata API for dynamic schema generation #20846

Merged
merged 12 commits into from
Jan 9, 2023
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-airtable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_airtable ./source_airtable
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-airtable
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-airtable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Source Acce
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```
python -m pytest integration_tests -p integration_tests.acceptance
docker build . --no-cache -t airbyte/source-airtable:dev \
&& python -m pytest integration_tests -p integration_tests.acceptance
```
To run your integration tests with docker

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
# See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-airtable:dev
tests:
acceptance_tests:
spec:
- spec_path: "source_airtable/spec.json"
tests:
- spec_path: "source_airtable/spec.json"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
tests:
- config_path: "secrets/config.json"
# bypassed this check, because discovery mechanism was changed
backward_compatibility_tests_config:
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
disable_for_version: "0.1.3"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,63 @@
"streams": [
{
"stream": {
"name": "Table 1",
"name": "users/table_1",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {}
},
"supported_sync_modes": ["full_refresh"],
"supported_destination_sync_modes": ["overwrite", "append_dedup"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "users/table_2",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {}
},
"supported_sync_modes": ["full_refresh"],
"supported_destination_sync_modes": ["overwrite", "append_dedup"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "users/field_type_test",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {}
},
"supported_sync_modes": ["full_refresh"],
"supported_destination_sync_modes": ["overwrite", "append_dedup"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "users/50_columns",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {}
},
"supported_sync_modes": ["full_refresh"],
"supported_destination_sync_modes": ["overwrite", "append_dedup"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "users/checkboxes",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
Expand All @@ -16,7 +72,7 @@
},
{
"stream": {
"name": "Table 2",
"name": "untitled_base/table_1",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
{
"api_key": "key####################",
"base_id": "app####################",
"tables": ["Table 1", "Table 2"]
"api_key": "key123456"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
{
"api_key": "key1234567890",
"base_id": "app1234567890",
"tables": ["Table 1", "Table 2"]
"api_key": "key1234567890"
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,26 @@

from typing import Any, Dict

import requests
from airbyte_cdk.models import AirbyteStream
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode, SyncMode
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator


class Helpers(object):
class Helpers:
@staticmethod
def get_most_complete_row(auth: TokenAuthenticator, base_id: str, table: str, sample_size: int = 100) -> Dict[str, Any]:
url = f"https://api.airtable.com/v0/{base_id}/{table}?pageSize={sample_size}"
try:
response = requests.get(url, headers=auth.get_auth_header())
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
raise Exception("Invalid API key")
elif e.response.status_code == 404:
raise Exception(f"Table '{table}' not found")
else:
raise Exception(f"Error getting first row from table {table}: {e}")
json_response = response.json()
records = json_response.get("records", [])
most_complete_row = records[0]
for record in records:
if len(record.keys()) > len(most_complete_row.keys()):
most_complete_row = record
return most_complete_row
def clean_name(name_str: str) -> str:
return name_str.replace(" ", "_").lower().strip()

@staticmethod
def get_json_schema(record: Dict[str, Any]) -> Dict[str, str]:
fields = record.get("fields", {})
def get_json_schema(table: Dict[str, Any]) -> Dict[str, str]:
fields = table.get("fields", {})
properties = {
"_airtable_id": {"type": ["null", "string"]},
"_airtable_created_time": {"type": ["null", "string"]},
}

for field in fields:
properties[field] = {"type": ["null", "string"]}
field_name = Helpers.clean_name(field.get("name"))
properties[field_name] = {"type": ["null", "string"]}

json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
Expand All @@ -52,9 +34,9 @@ def get_json_schema(record: Dict[str, Any]) -> Dict[str, str]:
return json_schema

@staticmethod
def get_airbyte_stream(table: str, json_schema: Dict[str, Any]) -> AirbyteStream:
def get_airbyte_stream(stream_name: str, json_schema: Dict[str, Any]) -> AirbyteStream:
return AirbyteStream(
name=table,
name=stream_name,
json_schema=json_schema,
supported_sync_modes=[SyncMode.full_refresh],
supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append_dedup],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,99 +3,77 @@
#


from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
import logging
from typing import Any, Iterable, List, Mapping, Tuple

import requests
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteCatalog
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

from .helpers import Helpers
from .streams import AirtableBases, AirtableStream, AirtableTables


# Basic full refresh stream
class AirtableStream(HttpStream, ABC):
url_base = "https://api.airtable.com/v0/"
primary_key = "id"
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

def __init__(self, base_id: str, table_name: str, schema, **kwargs):
super().__init__(**kwargs)
self.base_id = base_id
self.table_name = table_name
self.schema = schema

@property
def name(self):
return self.table_name

def get_json_schema(self) -> Mapping[str, Any]:
return self.schema

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
json_response = response.json()
offset = json_response.get("offset", None)
if offset:
return {"offset": offset}
return None

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
if next_page_token:
return next_page_token
return {}

def process_records(self, records):
for record in records:
data = record.get("fields", {})
processed_record = {"_airtable_id": record.get("id"), "_airtable_created_time": record.get("createdTime"), **data}
yield processed_record

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
json_response = response.json()
records = json_response.get("records", [])
records = self.process_records(records)
yield from records

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"{self.base_id}/{self.table_name}"
class SourceAirtable(AbstractSource):

logger: logging.Logger = logging.getLogger("airbyte")
streams_catalog: Iterable[Mapping[str, Any]] = []

# Source
class SourceAirtable(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
auth = TokenAuthenticator(token=config["api_key"])
for table in config["tables"]:
try:
Helpers.get_most_complete_row(auth, config["base_id"], table)
except Exception as e:
return False, str(e)
return True, None
try:
# try reading first table from each base, to check the connectivity,
for base in AirtableBases(authenticator=auth).read_records(sync_mode=None):
base_id = base.get("id")
base_name = base.get("name")
self.logger.info(f"Reading first table info for base: {base_name}")
next(AirtableTables(base_id=base_id, authenticator=auth).read_records(sync_mode=None))
return True, None
except Exception as e:
return False, str(e)

def prepare_catalog_for_base(self, base_id: str, base_name: str, base_tables: list) -> Iterable[Mapping[str, Any]]:
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
for table in base_tables:
if table not in self.streams_catalog:
self.streams_catalog.append(
{
"stream_path": f"{base_id}/{table.get('id')}",
"stream": Helpers.get_airbyte_stream(
f"{base_name}/{Helpers.clean_name(table.get('name'))}",
Helpers.get_json_schema(table),
),
}
)

def discover(self, logger: AirbyteLogger, config) -> AirbyteCatalog:
streams = []
"""
Override to provide the dynamic schema generation capabilities,
using resource available for authenticated user.

Retrieve: Bases, Tables from each Base, generate JSON Schema for each table.
"""
auth = TokenAuthenticator(token=config["api_key"])
for table in config["tables"]:
record = Helpers.get_most_complete_row(auth, config["base_id"], table)
json_schema = Helpers.get_json_schema(record)
airbyte_stream = Helpers.get_airbyte_stream(table, json_schema)
streams.append(airbyte_stream)
return AirbyteCatalog(streams=streams)

# list all bases available for authenticated account
for base in AirtableBases(authenticator=auth).read_records(sync_mode=None):
# list and process each table under each base to generate the JSON Schema
self.prepare_catalog_for_base(
base_id=base.get("id"),
base_name=Helpers.clean_name(base.get("name")),
base_tables=list(AirtableTables(base_id=base.get("id"), authenticator=auth).read_records(sync_mode=None)),
)
return AirbyteCatalog(streams=[stream["stream"] for stream in self.streams_catalog])

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = TokenAuthenticator(token=config["api_key"])
streams = []
for table in config["tables"]:
record = Helpers.get_most_complete_row(auth, config["base_id"], table)
json_schema = Helpers.get_json_schema(record)
stream = AirtableStream(base_id=config["base_id"], table_name=table, authenticator=auth, schema=json_schema)
streams.append(stream)
return streams
if not self.streams_catalog:
self.discover(None, config)

for stream in self.streams_catalog:
yield AirtableStream(
stream_path=stream["stream_path"],
stream_name=stream["stream"].name,
stream_schema=stream["stream"].json_schema,
authenticator=TokenAuthenticator(token=config["api_key"]),
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,14 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Airtable Source Spec",
"type": "object",
"required": ["api_key", "base_id", "tables"],
"required": ["api_key"],
"properties": {
"api_key": {
"type": "string",
"description": "The API Key for the Airtable account. See the <a href=\"https://support.airtable.com/hc/en-us/articles/219046777-How-do-I-get-my-API-key-\">Support Guide</a> for more information on how to obtain this key.",
"description": "The API Key or PAT for the Airtable account. See the <a href=\"https://airtable.com/developers/web/guides/personal-access-tokens\">Support Guide</a> for more information on how to obtain this key.",
"title": "API Key",
"airbyte_secret": true,
"examples": ["key1234567890"]
},
"base_id": {
"type": "string",
"description": "The Base ID to integrate the data from. You can find the Base ID following the link <a href=\"https://airtable.com/api\">Airtable API</a>, log in to your account, select the base you need and find Base ID in the docs.",
"title": "Base ID",
"examples": ["app1234567890"]
},
"tables": {
"type": "array",
"items": {
"type": "string"
},
"description": "The list of Tables to integrate.",
"title": "Tables",
"examples": ["table 1", "table 2"]
}
}
}
Expand Down
Loading