diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index ec1cc85e81b4..1645fc837e1e 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -16,6 +16,11 @@ dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/amazonsqs icon: amazonsqs.svg +- name: AWS Datalake + destinationDefinitionId: 99878c90-0fbd-46d3-9d98-ffde879d17fc + dockerRepository: airbyte/destination-aws-datalake + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/destinations/aws-datalake - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-aws-datalake/.dockerignore b/airbyte-integrations/connectors/destination-aws-datalake/.dockerignore new file mode 100644 index 000000000000..f45e49c10f18 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/.dockerignore @@ -0,0 +1,6 @@ +* +!Dockerfile +!main.py +!destination_aws_datalake +!setup.py +!boto3-preview diff --git a/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile b/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile new file mode 100644 index 000000000000..8449398cce7d --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.7-slim +# FROM python:3.7.11-alpine3.14 + +# Bash is installed for more convenient debugging. +# RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* + +WORKDIR /airbyte/integration_code +COPY destination_aws_datalake ./destination_aws_datalake +COPY main.py ./ +COPY setup.py ./ +RUN pip install . + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-aws-datalake diff --git a/airbyte-integrations/connectors/destination-aws-datalake/README.md b/airbyte-integrations/connectors/destination-aws-datalake/README.md new file mode 100644 index 000000000000..a89b68c3acf2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/README.md @@ -0,0 +1,160 @@ +# Aws Datalake Destination + +This is the repository for the Aws Datalake destination connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/destinations/aws-datalake). + +## Local development + +### Prerequisites + +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies + +From this connector directory, create a virtual environment: + +```bash +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: + +```bash +source .venv/bin/activate +pip install -r requirements.txt +``` + +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle + +From the Airbyte repository root, run: + +```bash +./gradlew :airbyte-integrations:connectors:destination-aws-datalake:build +``` + +#### Create credentials + +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/destinations/aws-datalake) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_aws_datalake/spec.json` file. +Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination aws-datalake test creds` +and place them into `secrets/config.json`. + +### Locally running the connector + +```bash +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build + +First, make sure you build the latest Docker image: + +```bash +docker build . -t airbyte/destination-aws-datalake:dev +``` + +You can also build the connector image via Gradle: + +```bash +./gradlew :airbyte-integrations:connectors:destination-aws-datalake:airbyteDocker +``` + +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run + +Then run any of the connector commands as follows: + +```bash +docker run --rm airbyte/destination-aws-datalake:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-aws-datalake:dev check --config /secrets/config.json +# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages +cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-aws-datalake:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` + +## Testing + +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. + +First install test dependencies into your virtual environment: + +```bash +pip install .[tests] +``` + +### Unit Tests + +To run unit tests locally, from the connector directory run: + +```bash +python -m pytest unit_tests +``` + +### Integration Tests + +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector). + +#### Custom Integration tests + +Place custom tests inside `integration_tests/` folder, then, from the connector root, run + +```bash +python -m pytest integration_tests +``` + +#### Acceptance Tests + +Coming soon: + +### Using gradle to run tests + +All commands should be run from airbyte project root. +To run unit tests: + +```bash +./gradlew :airbyte-integrations:connectors:destination-aws-datalake:unitTest +``` + +To run acceptance and custom integration tests: + +```bash +./gradlew :airbyte-integrations:connectors:destination-aws-datalake:integrationTest +``` + +## Dependency Management + +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. + +We split dependencies between two groups, dependencies that are: + +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector + +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? + +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-aws-datalake/build.gradle b/airbyte-integrations/connectors/destination-aws-datalake/build.gradle new file mode 100644 index 000000000000..5c3928a365d5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/build.gradle @@ -0,0 +1,20 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +dependencies { + // https://mvnrepository.com/artifact/com.google.guava/guava + implementation 'com.google.guava:guava:30.1.1-jre' + + // https://mvnrepository.com/artifact/software.amazon.awssdk/athena + implementation 'software.amazon.awssdk:athena:2.17.42' + + // https://mvnrepository.com/artifact/software.amazon.awssdk/glue + implementation 'software.amazon.awssdk:glue:2.17.42' + + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-aws-datalake') +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/__init__.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/__init__.py new file mode 100644 index 000000000000..28338201f4e7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/__init__.py @@ -0,0 +1,26 @@ +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +from .destination import DestinationAwsDatalake + +__all__ = ["DestinationAwsDatalake"] diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py new file mode 100644 index 000000000000..17c16679a831 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py @@ -0,0 +1,261 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# +import boto3 +from airbyte_cdk.destinations import Destination +from botocore.exceptions import ClientError +from retrying import retry + +from .config_reader import AuthMode, ConnectorConfig + + +class AwsHandler: + COLUMNS_MAPPING = {"number": "float", "string": "string", "integer": "int"} + + def __init__(self, connector_config, destination: Destination): + self._connector_config: ConnectorConfig = connector_config + self._destination: Destination = destination + self._bucket_name = connector_config.bucket_name + self.logger = self._destination.logger + + self.create_session() + self.s3_client = self.session.client("s3", region_name=connector_config.region) + self.glue_client = self.session.client("glue") + self.lf_client = self.session.client("lakeformation") + + @retry(stop_max_attempt_number=10, wait_random_min=1000, wait_random_max=2000) + def create_session(self): + if self._connector_config.credentials_type == AuthMode.IAM_USER.value: + self._session = boto3.Session( + aws_access_key_id=self._connector_config.aws_access_key, + aws_secret_access_key=self._connector_config.aws_secret_key, + region_name=self._connector_config.region, + ) + elif self._connector_config.credentials_type == AuthMode.IAM_ROLE.value: + client = boto3.client("sts") + role = client.assume_role( + RoleArn=self._connector_config.role_arn, + RoleSessionName="airbyte-destination-aws-datalake", + ) + creds = role.get("Credentials", {}) + self._session = boto3.Session( + aws_access_key_id=creds.get("AccessKeyId"), + aws_secret_access_key=creds.get("SecretAccessKey"), + aws_session_token=creds.get("SessionToken"), + region_name=self._connector_config.region, + ) + else: + raise Exception("Session wasn't created") + + @property + def session(self) -> boto3.Session: + return self._session + + @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) + def head_bucket(self): + print(self._bucket_name) + self.s3_client.head_bucket(Bucket=self._bucket_name) + + @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) + def head_object(self, object_key): + return self.s3_client.head_object(Bucket=self._bucket_name, Key=object_key) + + @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) + def put_object(self, object_key, body): + self.s3_client.put_object(Bucket=self._bucket_name, Key=object_key, Body="\n".join(body)) + + @staticmethod + def batch_iterate(iterable, n=1): + size = len(iterable) + for ndx in range(0, size, n): + yield iterable[ndx : min(ndx + n, size)] + + def get_table(self, txid, database_name: str, table_name: str, location: str): + table = None + try: + table = self.glue_client.get_table(DatabaseName=database_name, Name=table_name, TransactionId=txid) + except ClientError as e: + if e.response["Error"]["Code"] == "EntityNotFoundException": + table_input = { + "Name": table_name, + "TableType": "GOVERNED", + "StorageDescriptor": { + "Location": location, + "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "SerdeInfo": {"SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe", "Parameters": {"paths": ","}}, + }, + "PartitionKeys": [], + "Parameters": {"classification": "json", "lakeformation.aso.status": "true"}, + } + self.glue_client.create_table(DatabaseName=database_name, TableInput=table_input, TransactionId=txid) + table = self.glue_client.get_table(DatabaseName=database_name, Name=table_name, TransactionId=txid) + else: + err = e.response["Error"]["Code"] + self.logger.error(f"An error occurred: {err}") + raise + + if table: + return table + else: + return None + + @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) + def update_table(self, database, table_info, transaction_id): + self.glue_client.update_table(DatabaseName=database, TableInput=table_info, TransactionId=transaction_id) + + def preprocess_type(self, property_type): + if type(property_type) is list: + not_null_types = list(filter(lambda t: t != "null", property_type)) + if len(not_null_types) > 2: + return "string" + else: + return not_null_types[0] + else: + return property_type + + def update_table_schema(self, txid, database, table, schema): + table_info = table["Table"] + table_info_keys = list(table_info.keys()) + for k in table_info_keys: + if k not in [ + "Name", + "Description", + "Owner", + "LastAccessTime", + "LastAnalyzedTime", + "Retention", + "StorageDescriptor", + "PartitionKeys", + "ViewOriginalText", + "ViewExpandedText", + "TableType", + "Parameters", + "TargetTable", + "IsRowFilteringEnabled", + ]: + table_info.pop(k) + + self.logger.info("Schema = " + repr(schema)) + + columns = [{"Name": k, "Type": self.COLUMNS_MAPPING.get(self.preprocess_type(v["type"]), v["type"])} for (k, v) in schema.items()] + if "StorageDescriptor" in table_info: + table_info["StorageDescriptor"]["Columns"] = columns + else: + table_info["StorageDescriptor"] = {"Columns": columns} + self.update_table(database, table_info, txid) + self.glue_client.update_table(DatabaseName=database, TableInput=table_info, TransactionId=txid) + + @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) + def get_all_table_objects(self, txid, database, table): + table_objects = [] + + try: + res = self.lf_client.get_table_objects(DatabaseName=database, TableName=table, TransactionId=txid) + except ClientError as e: + if e.response["Error"]["Code"] == "EntityNotFoundException": + return [] + else: + err = e.response["Error"]["Code"] + self.logger.error(f"Could not get table objects due to error: {err}") + raise + + while True: + next_token = res.get("NextToken", None) + partition_objects = res.get("Objects") + table_objects.extend([p["Objects"] for p in partition_objects]) + if next_token: + res = self.lf_client.get_table_objects( + DatabaseName=database, + TableName=table, + TransactionId=txid, + NextToken=next_token, + ) + else: + break + flat_list = [item for sublist in table_objects for item in sublist] + return flat_list + + @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) + def purge_table(self, txid, database, table): + self.logger.debug(f"Going to purge table {table}") + write_ops = [] + all_objects = self.get_all_table_objects(txid, database, table) + write_ops.extend([{"DeleteObject": {"Uri": o["Uri"]}} for o in all_objects]) + if len(write_ops) > 0: + self.logger.debug(f"{len(write_ops)} objects to purge") + for batch in self.batch_iterate(write_ops, 99): + self.logger.debug("Purging batch") + try: + self.lf_client.update_table_objects( + TransactionId=txid, + DatabaseName=database, + TableName=table, + WriteOperations=batch, + ) + except ClientError as e: + self.logger.error(f"Could not delete object due to exception {repr(e)}") + raise + else: + self.logger.debug("Table was empty, nothing to purge.") + + def update_governed_table(self, txid, database, table, bucket, object_key, etag, size): + self.logger.debug(f"Updating governed table {database}:{table}") + write_ops = [ + { + "AddObject": { + "Uri": f"s3://{bucket}/{object_key}", + "ETag": etag, + "Size": size, + } + } + ] + + self.lf_client.update_table_objects( + TransactionId=txid, + DatabaseName=database, + TableName=table, + WriteOperations=write_ops, + ) + + +class LakeformationTransaction: + def __init__(self, aws_handler: AwsHandler): + self._aws_handler = aws_handler + self._transaction = None + self._logger = aws_handler.logger + + @property + def txid(self): + return self._transaction["TransactionId"] + + def cancel_transaction(self): + self._logger.debug("Canceling Lakeformation Transaction") + self._aws_handler.lf_client.cancel_transaction(TransactionId=self.txid) + + def commit_transaction(self): + self._logger.debug("Commiting Lakeformation Transaction") + self._aws_handler.lf_client.commit_transaction(TransactionId=self.txid) + + def extend_transaction(self): + self._logger.debug("Extending Lakeformation Transaction") + self._aws_handler.lf_client.extend_transaction(TransactionId=self.txid) + + def __enter__(self, transaction_type="READ_AND_WRITE"): + self._logger.debug("Starting Lakeformation Transaction") + self._transaction = self._aws_handler.lf_client.start_transaction(TransactionType=transaction_type) + self._logger.debug(f"Transaction id = {self.txid}") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._logger.debug("Exiting LakeformationTransaction context manager") + if exc_type: + self._logger.error("Exiting LakeformationTransaction context manager due to an exception") + self._logger.error(repr(exc_type)) + self._logger.error(repr(exc_val)) + self.cancel_transaction() + self._transaction = None + else: + self._logger.debug("Exiting LakeformationTransaction context manager due to reaching end of with block") + self.commit_transaction() + self._transaction = None diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/config_reader.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/config_reader.py new file mode 100644 index 000000000000..013587e2b895 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/config_reader.py @@ -0,0 +1,42 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import enum + + +class AuthMode(enum.Enum): + IAM_ROLE = "IAM Role" + IAM_USER = "IAM User" + + +class ConnectorConfig: + def __init__( + self, + aws_account_id: str = None, + region: str = None, + credentials: dict = None, + bucket_name: str = None, + bucket_prefix: str = None, + lakeformation_database_name: str = None, + table_name: str = None, + ): + self.aws_account_id = aws_account_id + self.credentials = credentials + self.credentials_type = credentials.get("credentials_title") + self.region = region + self.bucket_name = bucket_name + self.bucket_prefix = bucket_prefix + self.lakeformation_database_name = lakeformation_database_name + self.table_name = table_name + + if self.credentials_type == AuthMode.IAM_USER.value: + self.aws_access_key = self.credentials.get("aws_access_key_id") + self.aws_secret_key = self.credentials.get("aws_secret_access_key") + elif self.credentials_type == AuthMode.IAM_ROLE.value: + self.role_arn = self.credentials.get("role_arn") + else: + raise Exception("Auth Mode not recognized.") + + def __str__(self): + return f"" diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/destination.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/destination.py new file mode 100644 index 000000000000..fc5151c18b18 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/destination.py @@ -0,0 +1,113 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import json +from typing import Any, Iterable, Mapping + +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.destinations import Destination +from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type +from botocore.exceptions import ClientError + +from .aws import AwsHandler, LakeformationTransaction +from .config_reader import ConnectorConfig +from .stream_writer import StreamWriter + + +class DestinationAwsDatalake(Destination): + def write( + self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] + ) -> Iterable[AirbyteMessage]: + + """ + Reads the input stream of messages, config, and catalog to write data to the destination. + + This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received + in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been + successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing, + then the source is given the last state message output from this method as the starting point of the next sync. + + :param config: dict of JSON configuration matching the configuration declared in spec.json + :param configured_catalog: The Configured Catalog describing the schema of the data being received and how it should be persisted in the + destination + :param input_messages: The stream of input messages received from the source + :return: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs + """ + + connector_config = ConnectorConfig(**config) + + try: + aws_handler = AwsHandler(connector_config, self) + except ClientError as e: + self.logger.error(f"Could not create session due to exception {repr(e)}") + raise + self.logger.debug("AWS session creation OK") + + with LakeformationTransaction(aws_handler) as tx: + # creating stream writers + streams = { + s.stream.name: StreamWriter( + name=s.stream.name, + aws_handler=aws_handler, + tx=tx, + connector_config=connector_config, + schema=s.stream.json_schema["properties"], + sync_mode=s.destination_sync_mode, + ) + for s in configured_catalog.streams + } + + for message in input_messages: + if message.type == Type.STATE: + yield message + else: + data = message.record.data + stream = message.record.stream + streams[stream].append_message(json.dumps(data, default=str)) + + for stream_name, stream in streams.items(): + stream.add_to_datalake() + + def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + """ + Tests if the input configuration can be used to successfully connect to the destination with the needed permissions + e.g: if a provided API token or password can be used to connect and write to the destination. + + :param logger: Logging object to display debug/info/error to the logs + (logs will not be accessible via airbyte UI if they are not passed to this logger) + :param config: Json object containing the configuration of this destination, content of this json is as specified in + the properties of the spec.json file + + :return: AirbyteConnectionStatus indicating a Success or Failure + """ + + connector_config = ConnectorConfig(**config) + + try: + aws_handler = AwsHandler(connector_config, self) + except (ClientError, AttributeError) as e: + logger.error(f"""Could not create session on {connector_config.aws_account_id} Exception: {repr(e)}""") + message = f"""Could not authenticate using {connector_config.credentials_type} on Account {connector_config.aws_account_id} Exception: {repr(e)}""" + return AirbyteConnectionStatus(status=Status.FAILED, message=message) + + try: + aws_handler.head_bucket() + except ClientError as e: + message = f"""Could not find bucket {connector_config.bucket_name} in aws://{connector_config.aws_account_id}:{connector_config.region} Exception: {repr(e)}""" + return AirbyteConnectionStatus(status=Status.FAILED, message=message) + + with LakeformationTransaction(aws_handler) as tx: + table_location = "s3://" + connector_config.bucket_name + "/" + connector_config.bucket_prefix + "/" + "airbyte_test/" + table = aws_handler.get_table( + txid=tx.txid, + database_name=connector_config.lakeformation_database_name, + table_name="airbyte_test", + location=table_location, + ) + if table is None: + message = f"Could not create a table in database {connector_config.lakeformation_database_name}" + return AirbyteConnectionStatus(status=Status.FAILED, message=message) + + return AirbyteConnectionStatus(status=Status.SUCCEEDED) diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/spec.json b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/spec.json new file mode 100644 index 000000000000..72b4f9d37b9d --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/spec.json @@ -0,0 +1,107 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/aws-datalake", + "supportsIncremental": true, + "supportsNormalization": false, + "supportsDBT": false, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "AWS Datalake Destination Spec", + "type": "object", + "required": ["credentials", "region", "bucket_name", "bucket_prefix"], + "additionalProperties": false, + "properties": { + "aws_account_id": { + "type": "string", + "title": "AWS Account Id", + "description": "target aws account id", + "examples": ["111111111111"] + }, + "region": { + "title": "AWS Region", + "type": "string", + "description": "Region name", + "airbyte_secret": false + }, + "credentials": { + "title": "Authentication mode", + "description": "Choose How to Authenticate to AWS.", + "type": "object", + "oneOf": [ + { + "type": "object", + "title": "IAM Role", + "required": ["role_arn", "credentials_title"], + "properties": { + "credentials_title": { + "type": "string", + "title": "Credentials Title", + "description": "Name of the credentials", + "const": "IAM Role", + "enum": ["IAM Role"], + "default": "IAM Role", + "order": 0 + }, + "role_arn": { + "title": "Target Role Arn", + "type": "string", + "description": "Will assume this role to write data to s3", + "airbyte_secret": false + } + } + }, + { + "type": "object", + "title": "IAM User", + "required": [ + "credentials_title", + "aws_access_key_id", + "aws_secret_access_key_id" + ], + "properties": { + "credentials_title": { + "type": "string", + "title": "Credentials Title", + "description": "Name of the credentials", + "const": "IAM User", + "enum": ["IAM User"], + "default": "IAM User", + "order": 0 + }, + "aws_access_key_id": { + "title": "Access Key Id", + "type": "string", + "description": "AWS User Access Key Id", + "airbyte_secret": true + }, + "aws_secret_access_key": { + "title": "Secret Access Key", + "type": "string", + "description": "Secret Access Key", + "airbyte_secret": true + } + } + } + ] + }, + "bucket_name": { + "title": "S3 Bucket Name", + "type": "string", + "description": "Name of the bucket", + "airbyte_secret": false + }, + "bucket_prefix": { + "title": "Target S3 Bucket Prefix", + "type": "string", + "description": "S3 prefix", + "airbyte_secret": false + }, + "lakeformation_database_name": { + "title": "Lakeformation Database Name", + "type": "string", + "description": "Which database to use", + "airbyte_secret": false + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py new file mode 100644 index 000000000000..96c95bbd8b2e --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py @@ -0,0 +1,58 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +from datetime import datetime + +import nanoid +from airbyte_cdk.models import DestinationSyncMode + +from .aws import AwsHandler + + +class StreamWriter: + def __init__(self, name, aws_handler: AwsHandler, tx, connector_config, schema, sync_mode): + self._db = connector_config.lakeformation_database_name + self._bucket = connector_config.bucket_name + self._prefix = connector_config.bucket_prefix + self._table = name + self._aws_handler = aws_handler + self._tx = tx + self._schema = schema + self._sync_mode = sync_mode + self._messages = [] + self._logger = aws_handler.logger + + self._logger.info(f"Creating StreamWriter for {self._db}:{self._table}") + if sync_mode == DestinationSyncMode.overwrite: + self._logger.info(f"StreamWriter mode is OVERWRITE, need to purge {self._db}:{self._table}") + self._aws_handler.purge_table(self._tx.txid, self._db, self._table) + + def append_message(self, message): + self._logger.debug(f"Appending message to table {self._table}") + self._messages.append(message) + + def generate_object_key(self, prefix=None): + salt = nanoid.generate(size=10) + base = datetime.now().strftime("%Y%m%d%H%M%S") + path = f"{base}.{salt}.json" + if prefix: + path = f"{prefix}/{base}.{salt}.json" + + return path + + def add_to_datalake(self): + self._logger.info(f"Flushing messages to table {self._table}") + object_prefix = f"{self._prefix}/{self._table}" + object_key = self.generate_object_key(object_prefix) + self._aws_handler.put_object(object_key, self._messages) + res = self._aws_handler.head_object(object_key) + + table_location = "s3://" + self._bucket + "/" + self._prefix + "/" + self._table + "/" + table = self._aws_handler.get_table(self._tx.txid, self._db, self._table, table_location) + self._aws_handler.update_table_schema(self._tx.txid, self._db, table, self._schema) + + self._aws_handler.update_governed_table( + self._tx.txid, self._db, self._table, self._bucket, object_key, res["ETag"], res["ContentLength"] + ) + self._messages = [] diff --git a/airbyte-integrations/connectors/destination-aws-datalake/main.py b/airbyte-integrations/connectors/destination-aws-datalake/main.py new file mode 100644 index 000000000000..d3bf1d2886ee --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/main.py @@ -0,0 +1,31 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import sys + +from destination_aws_datalake import DestinationAwsDatalake + +if __name__ == "__main__": + DestinationAwsDatalake().run(sys.argv[1:]) diff --git a/airbyte-integrations/connectors/destination-aws-datalake/requirements.txt b/airbyte-integrations/connectors/destination-aws-datalake/requirements.txt new file mode 100644 index 000000000000..d6e1198b1ab1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/requirements.txt @@ -0,0 +1 @@ +-e . diff --git a/airbyte-integrations/connectors/destination-aws-datalake/setup.py b/airbyte-integrations/connectors/destination-aws-datalake/setup.py new file mode 100644 index 000000000000..f5a53473f6c6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/setup.py @@ -0,0 +1,50 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk==0.1.6-rc1", + "boto3", + "retrying", + "nanoid", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1" +] + +setup( + name="destination_aws_datalake", + description="Destination implementation for Aws Datalake.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AthenaHelper.java b/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AthenaHelper.java new file mode 100644 index 000000000000..5d267a15199a --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AthenaHelper.java @@ -0,0 +1,129 @@ +package io.airbyte.integrations.destination.aws_datalake; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.services.athena.AthenaClient; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.services.athena.model.QueryExecutionContext; +import software.amazon.awssdk.services.athena.model.ResultConfiguration; +import software.amazon.awssdk.services.athena.model.StartQueryExecutionRequest; +import software.amazon.awssdk.services.athena.model.StartQueryExecutionResponse; +import software.amazon.awssdk.services.athena.model.AthenaException; +import software.amazon.awssdk.services.athena.model.GetQueryExecutionRequest; +import software.amazon.awssdk.services.athena.model.GetQueryExecutionResponse; +import software.amazon.awssdk.services.athena.model.QueryExecutionState; +import software.amazon.awssdk.services.athena.model.GetQueryResultsRequest; +import software.amazon.awssdk.services.athena.model.GetQueryResultsResponse; +import software.amazon.awssdk.services.athena.model.ColumnInfo; +import software.amazon.awssdk.services.athena.model.Row; +import software.amazon.awssdk.services.athena.model.Datum; +import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable; + +import java.util.List; + +public class AthenaHelper { + private AthenaClient athenaClient; + private String outputBucket; + private String workGroup; + private static final Logger LOGGER = LoggerFactory.getLogger(AthenaHelper.class); + + public AthenaHelper(AwsCredentials credentials, Region region, String outputBucket, String workGroup) { + var credProvider = StaticCredentialsProvider.create(credentials); + this.athenaClient = AthenaClient.builder().region(region).credentialsProvider(credProvider).build(); + this.outputBucket = outputBucket; + this.workGroup = workGroup; + } + + public String submitAthenaQuery(String database, String query) { + try { + + // The QueryExecutionContext allows us to set the database + QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder() + .database(database).build(); + + // The result configuration specifies where the results of the query should go + ResultConfiguration resultConfiguration = ResultConfiguration.builder() + .outputLocation(outputBucket) + .build(); + + StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder() + .queryString(query) + .queryExecutionContext(queryExecutionContext) + .resultConfiguration(resultConfiguration) + .workGroup(workGroup) + .build(); + + StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest); + return startQueryExecutionResponse.queryExecutionId(); + } catch (AthenaException e) { + e.printStackTrace(); + System.exit(1); + } + return ""; + } + + public void waitForQueryToComplete(String queryExecutionId) throws InterruptedException { + GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder() + .queryExecutionId(queryExecutionId).build(); + + GetQueryExecutionResponse getQueryExecutionResponse; + boolean isQueryStillRunning = true; + while (isQueryStillRunning) { + getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest); + String queryState = getQueryExecutionResponse.queryExecution().status().state().toString(); + if (queryState.equals(QueryExecutionState.FAILED.toString())) { + throw new RuntimeException("The Amazon Athena query failed to run with error message: " + getQueryExecutionResponse + .queryExecution().status().stateChangeReason()); + } else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) { + throw new RuntimeException("The Amazon Athena query was cancelled."); + } else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) { + isQueryStillRunning = false; + } else { + // Sleep an amount of time before retrying again + Thread.sleep(1000); + } + System.out.println("The current status is: " + queryState); + } + } + + public GetQueryResultsIterable getResults(String queryExecutionId) { + + try { + + // Max Results can be set but if its not set, + // it will choose the maximum page size + GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder() + .queryExecutionId(queryExecutionId) + .build(); + + GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest); + return getQueryResultsResults; + + } catch (AthenaException e) { + e.printStackTrace(); + System.exit(1); + } + return null; + } + + public GetQueryResultsIterable runQuery(String database, String query) throws InterruptedException { + int retryCount = 0; + + while (retryCount < 10) { + var execId = submitAthenaQuery(database, query); + try { + waitForQueryToComplete(execId); + } catch (RuntimeException e) { + LOGGER.info("Athena query failed once. Retrying."); + retryCount++; + continue; + } + return getResults(execId); + } + LOGGER.info("Athena query failed and we are out of retries."); + return null; + } +} diff --git a/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AwsDatalakeDestinationConfig.java b/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AwsDatalakeDestinationConfig.java new file mode 100644 index 000000000000..7b93c839d53c --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AwsDatalakeDestinationConfig.java @@ -0,0 +1,100 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.aws_datalake; + +import com.fasterxml.jackson.databind.JsonNode; + +public class AwsDatalakeDestinationConfig { + + private final String awsAccountId; + private final String region; + private final String accessKeyId; + private final String secretAccessKey; + private final String bucketName; + private final String prefix; + private final String databaseName; + + public AwsDatalakeDestinationConfig(String awsAccountId, + String region, + String accessKeyId, + String secretAccessKey, + String bucketName, + String prefix, + String databaseName) { + this.awsAccountId = awsAccountId; + this.region = region; + this.accessKeyId = accessKeyId; + this.secretAccessKey = secretAccessKey; + this.bucketName = bucketName; + this.prefix = prefix; + this.databaseName = databaseName; + + } + + public static AwsDatalakeDestinationConfig getAwsDatalakeDestinationConfig(JsonNode config) { + + final String aws_access_key_id = config.path("credentials").get("aws_access_key_id").asText(); + final String aws_secret_access_key = config.path("credentials").get("aws_secret_access_key").asText(); + + return new AwsDatalakeDestinationConfig( + config.get("aws_account_id").asText(), + config.get("region").asText(), + aws_access_key_id, + aws_secret_access_key, + config.get("bucket_name").asText(), + config.get("bucket_prefix").asText(), + config.get("lakeformation_database_name").asText() + ); + } + + public String getAwsAccountId() { + return awsAccountId; + } + + public String getRegion() { + return region; + } + + public String getAccessKeyId() { + return accessKeyId; + } + + public String getSecretAccessKey() { + return secretAccessKey; + } + + public String getBucketName() { + return bucketName; + } + + public String getPrefix() { + return prefix; + } + + public String getDatabaseName() { + return databaseName; + } + +} diff --git a/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/GlueHelper.java b/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/GlueHelper.java new file mode 100644 index 000000000000..0f04d6ac87df --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/GlueHelper.java @@ -0,0 +1,68 @@ +package io.airbyte.integrations.destination.aws_datalake; + +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; + + +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.paginators.GetTablesIterable; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest; +import software.amazon.awssdk.services.glue.model.GlueRequest; + +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; + +public class GlueHelper { + private AwsCredentials awsCredentials; + private Region region; + private GlueClient glueClient; + + public GlueHelper(AwsCredentials credentials, Region region) { + this.awsCredentials = credentials; + this.region = region; + + var credProvider = StaticCredentialsProvider.create(credentials); + this.glueClient = GlueClient.builder().region(region).credentialsProvider(credProvider).build(); + } + + private GetTablesIterable getAllTables(String DatabaseName) { + + GetTablesRequest getTablesRequest = GetTablesRequest.builder().databaseName(DatabaseName).build(); + GetTablesIterable getTablesPaginator = glueClient.getTablesPaginator(getTablesRequest); + + return getTablesPaginator; + } + + private BatchDeleteTableRequest getBatchDeleteRequest(String databaseName, GetTablesIterable getTablesPaginator) { + List tablesToDelete = new ArrayList(); + for (GetTablesResponse response : getTablesPaginator) { + List tablePage = response.tableList(); + Iterator
tableIterator = tablePage.iterator(); + while (tableIterator.hasNext()) { + Table table = tableIterator.next(); + tablesToDelete.add(table.name()); + } + } + BatchDeleteTableRequest batchDeleteRequest = BatchDeleteTableRequest.builder().databaseName(databaseName).tablesToDelete(tablesToDelete).build(); + return batchDeleteRequest; + } + + public void purgeDatabase(String databaseName) { + int countRetries = 0; + while (countRetries < 5) { + try { + GetTablesIterable allTables = getAllTables(databaseName); + BatchDeleteTableRequest batchDeleteTableRequest = getBatchDeleteRequest(databaseName, allTables); + glueClient.batchDeleteTable(batchDeleteTableRequest); + return; + } catch (Exception e) { + countRetries++; + } + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-aws-datalake/src/test-integration/java/io/airbyte/integrations/destination/aws_datalake/AwsDatalakeDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-aws-datalake/src/test-integration/java/io/airbyte/integrations/destination/aws_datalake/AwsDatalakeDestinationAcceptanceTest.java new file mode 100644 index 000000000000..fc0f63fdaec7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/src/test-integration/java/io/airbyte/integrations/destination/aws_datalake/AwsDatalakeDestinationAcceptanceTest.java @@ -0,0 +1,200 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.aws_datalake; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.util.List; +import com.google.common.collect.ImmutableMap; + +import com.fasterxml.jackson.databind.JsonNode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.aws_datalake.AwsDatalakeDestinationConfig; +import io.airbyte.integrations.destination.aws_datalake.AthenaHelper; +import io.airbyte.integrations.destination.aws_datalake.GlueHelper; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; + +// import com.amazonaws.auth.AWSCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.services.athena.model.GetQueryResultsResponse; +import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable; +import software.amazon.awssdk.services.athena.model.ColumnInfo; +import software.amazon.awssdk.services.athena.model.Row; +import software.amazon.awssdk.services.athena.model.Datum; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import com.google.common.collect.Maps; + +//import com.amazonaws.auth.BasicAWSCredentials; +import software.amazon.awssdk.regions.Region; + + +public class AwsDatalakeDestinationAcceptanceTest extends DestinationAcceptanceTest { + + private static final String CONFIG_PATH = "secrets/config.json"; + private static final Logger LOGGER = LoggerFactory.getLogger(AwsDatalakeDestinationAcceptanceTest.class); + + private JsonNode configJson; + private JsonNode configInvalidCredentialsJson; + protected AwsDatalakeDestinationConfig config; + private AthenaHelper athenaHelper; + private GlueHelper glueHelper; + + @Override + protected String getImageName() { + return "airbyte/destination-aws-datalake:dev"; + } + + @Override + protected JsonNode getConfig() { + // TODO: Generate the configuration JSON file to be used for running the destination during the test + // configJson can either be static and read from secrets/config.json directly + // or created in the setup method + return configJson; + } + + @Override + protected JsonNode getFailCheckConfig() { + JsonNode credentials = Jsons.jsonNode(ImmutableMap.builder() + .put("credentials_title", "IAM User") + .put("aws_access_key_id", "wrong-access-key") + .put("aws_secret_access_key", "wrong-secret") + .build()); + + JsonNode config = Jsons.jsonNode(ImmutableMap.builder() + .put("aws_account_id", "112233") + .put("region", "us-east-1") + .put("bucket_name", "test-bucket") + .put("bucket_prefix", "test") + .put("lakeformation_database_name", "lf_db") + .put("credentials", credentials) + .build()); + + return config; + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace, + JsonNode streamSchema) + throws IOException, InterruptedException { + // TODO Implement this method to retrieve records which written to the destination by the connector. + // Records returned from this method will be compared against records provided to the connector + // to verify they were written correctly + LOGGER.info(String.format(">>>>>>>>>> namespace = %s, streamName = %s", namespace, streamName)); + // 2. Read from database:table (SELECT *) + String query = String.format("SELECT * FROM \"%s\".\"%s\"", config.getDatabaseName(), streamName); + LOGGER.info(String.format(">>>>>>>>>> query = %s", query)); + GetQueryResultsIterable results = athenaHelper.runQuery(config.getDatabaseName(), query); + // 3. return the rows as a list of JsonNodes + + return parseResults(results); + } + + protected List parseResults(GetQueryResultsIterable queryResults) { + + List processedResults = new ArrayList<>(); + + for (GetQueryResultsResponse result : queryResults) { + List columnInfoList = result.resultSet().resultSetMetadata().columnInfo(); + Iterator results = result.resultSet().rows().iterator(); + // processRow(results, columnInfoList); + // first row has column names + Row colNamesRow = results.next(); + while (results.hasNext()) { + Map jsonMap = Maps.newHashMap(); + Row r = results.next(); + Iterator colInfoIterator = columnInfoList.iterator(); + Iterator datum = r.data().iterator(); + while (colInfoIterator.hasNext() && datum.hasNext()) { + ColumnInfo colInfo = colInfoIterator.next(); + Datum value = datum.next(); + LOGGER.info(String.format("key = %s, value = %s, type = %s", colInfo.name(), value.varCharValue(), colInfo.type())); + Object typedFieldValue = getTypedFieldValue(colInfo.type(), value.varCharValue()); + if (typedFieldValue != null) { + jsonMap.put(colInfo.name(), typedFieldValue); + } + } + processedResults.add(Jsons.jsonNode(jsonMap)); + } + } + return processedResults; + } + + private static Object getTypedFieldValue(String typeName, String varCharValue) { + if (varCharValue == null) + return null; + return switch (typeName) { + case "real", "double", "float" -> Double.parseDouble(varCharValue); + case "varchar" -> varCharValue; + case "boolean" -> Boolean.parseBoolean(varCharValue); + case "integer" -> Integer.parseInt(varCharValue); + default -> null; + }; + } + + @Override + protected List resolveIdentifier(String identifier) { + final List result = new ArrayList<>(); + result.add(identifier); + result.add(identifier.toLowerCase()); + return result; + } + + private JsonNode loadJsonFile(String fileName) throws IOException { + final JsonNode configFromSecrets = Jsons.deserialize(IOs.readFile(Path.of(fileName))); + return (configFromSecrets); + } + + @Override + protected void setup(TestDestinationEnv testEnv) throws IOException { + configJson = loadJsonFile(CONFIG_PATH); + + + this.config = AwsDatalakeDestinationConfig.getAwsDatalakeDestinationConfig(configJson); + + AwsBasicCredentials awsCreds = AwsBasicCredentials.create(config.getAccessKeyId(), config.getSecretAccessKey()); + athenaHelper = new AthenaHelper(awsCreds, Region.US_EAST_1, String.format("s3://%s/airbyte_athena/", config.getBucketName()), "AmazonAthenaLakeFormationPreview"); + glueHelper = new GlueHelper(awsCreds, Region.US_EAST_1); + glueHelper.purgeDatabase(config.getDatabaseName()); + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + // TODO Implement this method to run any cleanup actions needed after every test case + // glueHelper.purgeDatabase(config.getDatabaseName()); + } + +} diff --git a/airbyte-integrations/connectors/destination-aws-datalake/src/test/AwsDatalakeDestinationTest.java b/airbyte-integrations/connectors/destination-aws-datalake/src/test/AwsDatalakeDestinationTest.java new file mode 100644 index 000000000000..c38b5b0cbff4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/src/test/AwsDatalakeDestinationTest.java @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.aws_datalake; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.*; +import org.junit.jupiter.api.Test; + +class AwsDatalakeDestinationTest { + + /* + * @Test void testGetOutputTableNameWithString() throws Exception { var actual = + * DynamodbOutputTableHelper.getOutputTableName("test_table", "test_namespace", "test_stream"); + * assertEquals("test_table_test_namespace_test_stream", actual); } + * + * @Test void testGetOutputTableNameWithStream() throws Exception { var stream = new + * AirbyteStream(); stream.setName("test_stream"); stream.setNamespace("test_namespace"); var actual + * = DynamodbOutputTableHelper.getOutputTableName("test_table", stream); + * assertEquals("test_table_test_namespace_test_stream", actual); } + */ + @Test + void testGetAwsDatalakeDestinationdbConfig() throws Exception { + JsonNode json = Jsons.deserialize(""" + { + "bucket_prefix": "test_prefix", + "region": "test_region", + "auth_mode": "USER", + "bucket_name": "test_bucket", + "aws_access_key_id": "test_access_key", + "aws_account_id": "test_account_id", + "lakeformation_database_name": "test_database", + "aws_secret_access_key": "test_secret" + }"""); + + var config = AwsDatalakeDestinationConfig.getAwsDatalakeDestinationConfig(json); + + assertEquals(config.getBucketPrefix(), "test_prefix"); + assertEquals(config.getRegion(), "test_region"); + assertEquals(config.getAccessKeyId(), "test_access_key"); + assertEquals(config.getSecretAccessKey(), "test_secret"); + } +} diff --git a/airbyte-integrations/connectors/destination-aws-datalake/unit_tests/unit_test.py b/airbyte-integrations/connectors/destination-aws-datalake/unit_tests/unit_test.py new file mode 100644 index 000000000000..f03f99f7c46e --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/unit_tests/unit_test.py @@ -0,0 +1,25 @@ +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +def test_example_method(): + assert True diff --git a/docs/integrations/destinations/aws-datalake.md b/docs/integrations/destinations/aws-datalake.md new file mode 100644 index 000000000000..64f121693a9d --- /dev/null +++ b/docs/integrations/destinations/aws-datalake.md @@ -0,0 +1,102 @@ +# AWS Datalake + +## Overview + +The AWS Datalake destination connector allows you to sync data to AWS. It will write data as JSON files in S3 and +update the Glue data catalog so that the data is available throughout other AWS services such as Athena, Glue jobs, EMR, +Redshift, etc. + +### Sync overview +#### Output schema + +The Glue tables will be created with schema information provided by the source, i.e : You will find the same columns +and types in the destination table as in the source. + +#### Features + +| Feature | Supported?\(Yes/No\) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | Yes | | +| Incremental - Append Sync | Yes | | +| Namespaces | No | | + +## Getting started +### Requirements + +To use this destination connector, you will need: +* A AWS account +* A S3 bucket where the data will be written +* A AWS Lake Formation database where tables will be created (one per stream) +* AWS credentials in the form of either the pair Access key ID / Secret key ID or a role with the following permissions: + + * Writing objects in the S3 bucket + * Updating of the Lake Formation database + +See the setup guide for more information about the creation of the resources. + +### Setup guide +#### Creating an AWS account + +Feel free to skip this section if you already have an AWS account. + +You will find the instructions to setup a new AWS account [here](https://aws.amazon.com/premiumsupport/knowledge-center/create-and-activate-aws-account/). + +#### Creating an S3 bucket + +Feel free to skip this section if you already have an S3 bucket. + +You will find the instructions to create an S3 bucket [here](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html). + +#### Creating a Lake Formation Database + +Feel free to skip this section if you already have a Lake Formation Database. + +You will find the instructions to create a Lakeformation Database [here](https://docs.aws.amazon.com/lake-formation/latest/dg/creating-database.html). + +#### Creating Credentials + +The AWS Datalake connector lets you authenticate with either a user or a role. In both case, you will have to make sure +that appropriate policies are in place. + +Feel free to skip this section if you already have appropriate credentials. + +**Option 1: Creating a user** + +You will find the instructions to create a user [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_users_create.html). +Make sure to select "Programmatic Access" so that you get secret access keys. + + +**Option 2: Creating a role** + +You will find the instructions to create a role [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html). + +**Assigning proper permissions** + +The policy used by the user or the role must have access to the following services: + +* AWS Lake Formation +* AWS Glue +* AWS S3 + +You can use [the AWS policy generator](https://awspolicygen.s3.amazonaws.com/policygen.html) to help you generate an appropriate policy. + +Please also make sure that the role or user you will use has appropriate permissions on the database in AWS Lakeformation. + +### Setup the AWS Datalake destination in Airbyte + +You should now have all the requirements needed to configure AWS Datalake as a destination in the UI. You'll need the +following information to configure the destination: + +- Aws Account Id : The account ID of your AWS account +- Aws Region : The region in which your resources are deployed +- Authentication mode : "ROLE" if you are using a role, "USER" if using a user with Access key / Secret Access key +- Target Role Arn : The name of the role, if "Authentication mode" was "ROLE" +- Access Key Id : The Access Key ID of the user if "Authentication mode" was "USER" +- Secret Access Key : The Secret Access Key ID of the user if "Authentication mode" was "USER" +- S3 Bucket Name : The bucket in which the data will be written +- Target S3 Bucket Prefix : A prefix to prepend to the file name when writing to the bucket +- Database : The database in which the tables will be created + + +## Changelog +| 0.1.0 | 2022-03-29 | [\#10760](https://github.com/airbytehq/airbyte/pull/10760) | Initial release | \ No newline at end of file diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index d1ed9c18639e..e70f3a30fdba 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -235,7 +235,7 @@ Now that you have set up the Snowflake destination connector, check out the foll | Version | Date | Pull Request | Subject | |:--------|:-----------| :----- | :------ | -| 0.4.24 | 2022-03-24 | [\#11093](https://github.com/airbytehq/airbyte/pull/11093) | Added OAuth support | +| 0.4.24 | 2022-03-24 | [\#11093](https://github.com/airbytehq/airbyte/pull/11093) | Added OAuth support (Compatible with Airbyte Version 0.35.60+)| | 0.4.22 | 2022-03-18 | [\#10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters | | 0.4.21 | 2022-03-18 | [\#11071](https://github.com/airbytehq/airbyte/pull/11071) | Switch to compressed on-disk buffering before staging to s3/internal stage | | 0.4.20 | 2022-03-14 | [\#10341](https://github.com/airbytehq/airbyte/pull/10341) | Add Azure blob staging support |