Skip to content

Commit

Permalink
🎉 Source Table Storage: Add incremental append capability (#14212)
Browse files Browse the repository at this point in the history
* Add incremental append load to azure table storage. Resolves #11275

* fix: revert table name

* fix: integration tests are failing

* chore: update the version

* auto-bump connector version [ci skip]

Co-authored-by: Harshith Mullapudi <harshithmullapudi@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored and mfsiega-airbyte committed Jul 21, 2022
1 parent 54810d0 commit 7766695
Show file tree
Hide file tree
Showing 17 changed files with 300 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
- name: Azure Table Storage
sourceDefinitionId: 798ae795-5189-42b6-b64e-3cb91db93338
dockerRepository: airbyte/source-azure-table
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/sources/azure-table
icon: azureblobstorage.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-azure-table:0.1.1"
- dockerImage: "airbyte/source-azure-table:0.1.2"
spec:
documentationUrl: "https://docsurl.com"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_azure_table ./source_azure_table
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-azure-table
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-azure-table/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ and place them into `secrets/config.json`.
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --state integration_tests/state.json
```

### Locally running the connector docker image
Expand All @@ -73,7 +73,7 @@ Then run any of the connector commands as follows:
docker run --rm airbyte/source-azure-table:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-azure-table:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-azure-table:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-azure-table:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-azure-table:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json --state /integration_tests/state.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
# TODO uncomment this block to specify that the tests should assert the connector outputs the records provided in the input file a file
# expect_records:
# path: "integration_tests/expected_records.txt"
# extra_fields: no
# exact_order: no
# extra_records: yes
# incremental: # TODO if your connector does not implement incremental sync, remove this block
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
validate_schema: False
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
ignored_fields:
"AirbyteTest": ["record"]
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"books": {
"timestamp": "2021-10-16T14:28:23.708515Z"
"Test": {
"PartitionKey": "abcd"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,20 @@
"streams": [
{
"stream": {
"name": "books",
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": true,
"name": "Test",
"json_schema": {
"properties": {
"data": {
"type": "object"
},
"additionalProperties": {
"type": "boolean"
"PartitionKey": {
"type": "string"
}
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "booksraw",
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": true,
"json_schema": {
"properties": {
"data": {
"type": "object"
},
"additionalProperties": {
"type": "boolean"
}
}
}
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
"source_defined_cursor": true,
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["PartitionKey"]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,17 @@
"name": "Test",
"json_schema": {
"properties": {
"data": {
"type": "object"
},
"additionalProperties": {
"type": "boolean"
"PartitionKey": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false,
"default_cursor_field": ["metadata"]
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"source_defined_cursor": true,
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["PartitionKey"]
}
]
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"books": {
"timestamp": "2021-10-16T14:28:23.708585Z"
"Test": {
"PartitionKey": "1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"Test": {
"PartitionKey": "1"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

TEST_REQUIREMENTS = [
"pytest~=6.1",
"pytest-mock~=3.6.1",
"source-acceptance-test",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import re
from typing import Dict, Iterable, List
from typing import Iterable

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteStream
from azure.core.paging import ItemPaged
from azure.data.tables import TableClient, TableServiceClient

from . import constants


class Reader:
class AzureTableReader:
"""
This reader reads data from given table
Expand All @@ -31,14 +30,17 @@ class Reader:
Methods
-------
get_table_service()
get_table_service_client()
Returns azure table service client from connection string.
get_table_client(table_name: str)
Returns azure table client from connection string.
get_streams()
Fetches all tables from storage account and returns them in Airbyte stream.
get_tables()
Fetches all tables from storage account
read_table()
Reads data from an Azure table
"""

Expand All @@ -54,12 +56,11 @@ def __init__(self, logger: AirbyteLogger, config: dict):
self.account_name = config[constants.azure_storage_account_name_key_name]
self.access_key = config[constants.azure_storage_access_key_key_name]
self.endpoint_suffix = config[constants.azure_storage_endpoint_suffix_key_name]
self.endpoint = "{}.table.{}".format(self.account_name, self.endpoint_suffix)
self.connection_string = "DefaultEndpointsProtocol=https;AccountName={};AccountKey={};EndpointSuffix={}".format(
self.account_name, self.access_key, self.endpoint_suffix
)

def get_table_service(self) -> TableServiceClient:
def get_table_service_client(self) -> TableServiceClient:
"""
Returns azure table service client from connection string.
Table service client facilitate interaction with tables. Please read more here - https://docs.microsoft.com/en-us/rest/api/storageservices/operations-on-tables
Expand Down Expand Up @@ -88,55 +89,30 @@ def get_table_client(self, table_name: str) -> TableClient:
except Exception as e:
raise Exception(f"An exception occurred: {str(e)}")

def get_streams(self) -> List[AirbyteStream]:
def get_tables(self) -> ItemPaged:
"""
Fetches all tables from storage account and returns them in Airbyte stream.
"""
try:
streams = []
table_client = self.get_table_service()
tables_iterator = table_client.list_tables(results_per_page=constants.results_per_page)
for table in tables_iterator:
stream_name = table.name
stream = AirbyteStream(name=stream_name, json_schema=self.get_typed_schema)
stream.supported_sync_modes = ["full_refresh"]
streams.append(stream)
self.logger.info(f"Total {streams.count} streams found.")
return streams
table_service_client = self.get_table_service_client()
tables_iterator = table_service_client.list_tables(results_per_page=constants.results_per_page)
return tables_iterator
except Exception as e:
raise Exception(f"An exception occurred: {str(e)}")

@property
def get_typed_schema(self) -> object:
"""Static schema for tables"""
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {"data": {"type": "object"}, "additionalProperties": {"type": "boolean"}},
}

@property
def stream_name(self) -> str:
return str(self._client.table_name)
def read_table(self, table_client: TableClient, filter_query: str = None) -> Iterable:
"""
Reads data from an Azure table.
@property
def stream_url(self) -> str:
return str(self._client.url)
Parameters
----------
table_client : TableClient
table client object to be able to access querying methods.
def read(self, client: TableClient, filter_query=None, parameters=None) -> Iterable:
filter_query : str
either None or a query to pull data from table storage (based on the PartitionKey)
"""
if filter_query is None:
return client.list_entities()
return table_client.list_entities()
else:
return client.query_entities(filter=filter_query, results_per_page=constants.results_per_page)

def get_filter_query(self, stream_name: str, state: Dict[str, any]) -> str:
watermark = state["stream_name"]
if watermark is None or watermark is dict:
return None
else:
return f"Timestamp gt datetime'{watermark}'"

@staticmethod
def is_table_name_valid(self, name: str) -> bool:
"""Validates the tables name against regex - https://docs.microsoft.com/en-us/rest/api/storageservices/Understanding-the-Table-Service-Data-Model?redirectedfrom=MSDN#characters-disallowed-in-key-fields"""
return re.match(constants.table_name_regex, name)
return table_client.query_entities(query_filter=filter_query, results_per_page=constants.results_per_page)
Loading

0 comments on commit 7766695

Please sign in to comment.