From 58b569da18dbc542f79bdc3e9a40f690e5804b60 Mon Sep 17 00:00:00 2001 From: Alasdair Brown Date: Tue, 26 Oct 2021 17:23:10 +0100 Subject: [PATCH] :tada: Source Amazon SQS: New connector (#6937) * Initial commit, working source with static Creds * Typo in example queue url * Adds auto delete of messages after read * Adds visibility timeout * remove insecure comments from AWS IAM Key spec * explicitly set supported sync modes * explicit sync mode should be lower case * Adds unit tests for check, discover, read * remove incremental acceptance test block * remove incremental from conf catalog sample * remove test requirement moto from main req * align int catalog sample with sample_files * fixing catalog configs * acceptance testing config * adds expected records txt * automated formatting changes * remove expected records block from acpt test * Adds Docs page * Ammends formatting on readme * Adds doc link to summary * Improve error handling & debug logging * Adds bootstrap.md * Add a todo suggestion for batch output * Adds SQS to integrations readme list * lower case properties * removed unused line * uses enum for aws region * updates sample configs to use lowercase * required props to lower case * add missed property to lowercase * gradle formatting * Fixing issues from acceptance tests * annotate secrets in spec.json with airbyte_secret * Adds explicit warnings about data less when using Delete Message option --- .../source-amazon-sqs/.dockerignore | 7 + .../connectors/source-amazon-sqs/Dockerfile | 38 ++++ .../connectors/source-amazon-sqs/README.md | 129 ++++++++++++++ .../acceptance-test-config.yml | 25 +++ .../acceptance-test-docker.sh | 16 ++ .../connectors/source-amazon-sqs/bootstrap.md | 26 +++ .../connectors/source-amazon-sqs/build.gradle | 14 ++ .../integration_tests/__init__.py | 3 + .../integration_tests/abnormal_state.json | 5 + .../integration_tests/acceptance.py | 16 ++ .../integration_tests/catalog.json | 24 +++ .../integration_tests/configured_catalog.json | 27 +++ .../integration_tests/expected_records.txt | 1 + .../integration_tests/invalid_config.json | 11 ++ .../integration_tests/sample_config.json | 11 ++ .../integration_tests/sample_state.json | 5 + .../connectors/source-amazon-sqs/main.py | 13 ++ .../source-amazon-sqs/requirements.txt | 3 + .../sample_files/configured_catalog.json | 27 +++ .../connectors/source-amazon-sqs/setup.py | 23 +++ .../source_amazon_sqs/__init__.py | 8 + .../source_amazon_sqs/source.py | 165 ++++++++++++++++++ .../source_amazon_sqs/spec.json | 105 +++++++++++ .../source-amazon-sqs/unit_tests/unit_test.py | 130 ++++++++++++++ docs/SUMMARY.md | 1 + docs/integrations/README.md | 1 + docs/integrations/sources/amazon-sqs.md | 88 ++++++++++ 27 files changed, 922 insertions(+) create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/.dockerignore create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/Dockerfile create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/README.md create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/bootstrap.md create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/build.gradle create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/integration_tests/catalog.json create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/integration_tests/expected_records.txt create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/integration_tests/invalid_config.json create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/integration_tests/sample_config.json create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/integration_tests/sample_state.json create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/main.py create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/requirements.txt create mode 100755 airbyte-integrations/connectors/source-amazon-sqs/sample_files/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/setup.py create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/__init__.py create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/source.py create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/spec.json create mode 100644 airbyte-integrations/connectors/source-amazon-sqs/unit_tests/unit_test.py create mode 100644 docs/integrations/sources/amazon-sqs.md diff --git a/airbyte-integrations/connectors/source-amazon-sqs/.dockerignore b/airbyte-integrations/connectors/source-amazon-sqs/.dockerignore new file mode 100644 index 000000000000..21c0b2142e14 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/.dockerignore @@ -0,0 +1,7 @@ +* +!Dockerfile +!Dockerfile.test +!main.py +!source_amazon_sqs +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-amazon-sqs/Dockerfile b/airbyte-integrations/connectors/source-amazon-sqs/Dockerfile new file mode 100644 index 000000000000..c49374e88a18 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.7.11-alpine3.14 as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base + + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +RUN apk --no-cache add bash + +# copy payload code only +COPY main.py ./ +COPY source_amazon_sqs ./source_amazon_sqs + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-amazon-sqs diff --git a/airbyte-integrations/connectors/source-amazon-sqs/README.md b/airbyte-integrations/connectors/source-amazon-sqs/README.md new file mode 100644 index 000000000000..8e77f4bc5f1e --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/README.md @@ -0,0 +1,129 @@ +# Amazon Sqs Source + +This is the repository for the Amazon Sqs source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/amazon-sqs). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-amazon-sqs:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/amazon-sqs) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_amazon_sqs/spec.json` file. +Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source amazon-sqs test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-amazon-sqs:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-amazon-sqs:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-amazon-sqs:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-amazon-sqs:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-amazon-sqs:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-amazon-sqs:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing + Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-amazon-sqs:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-amazon-sqs:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-amazon-sqs/acceptance-test-config.yml b/airbyte-integrations/connectors/source-amazon-sqs/acceptance-test-config.yml new file mode 100644 index 000000000000..ed9571848de1 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/acceptance-test-config.yml @@ -0,0 +1,25 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-amazon-sqs:dev +tests: + spec: + - spec_path: "source_amazon_sqs/spec.json" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + # expect_records: + # path: "integration_tests/expected_records.txt" + # extra_fields: no + # exact_order: no + # extra_records: yes + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-amazon-sqs/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-amazon-sqs/acceptance-test-docker.sh new file mode 100644 index 000000000000..e4d8b1cef896 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-amazon-sqs/bootstrap.md b/airbyte-integrations/connectors/source-amazon-sqs/bootstrap.md new file mode 100644 index 000000000000..42d2210a63d0 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/bootstrap.md @@ -0,0 +1,26 @@ +# Amazon SQS Source + +## What +This is a connector for consuming messages from an [Amazon SQS Queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html) + +## How +### Polling +It uses [long polling](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html) to consume in batches +of up to 10 at a time (10 is the maximum defined by the AWS API). + +The batch size is configurable between 1 and 10 (a size of 0 would use short-polling, this is not allowed). + +Using larger batches reduces the amount of connections thus increasing performance. + +### Deletes +Optionally, it can delete messages after reading - the delete_message() call is made __after__ yielding the message to the generator. +This means that messages aren't deleted unless read by a Destination - however, there is still potential that this could result in +missed messages if the Destination fails __after__ taking the message, but before commiting to to its own downstream. + +### Credentials +Requires an AWS IAM Access Key ID and Secret Key. + +This could be improved to add support for configured AWS profiles, env vars etc. + +### Output +Although messages are consumed in batches, they are output from the Source as individual messages. \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-amazon-sqs/build.gradle b/airbyte-integrations/connectors/source-amazon-sqs/build.gradle new file mode 100644 index 000000000000..fb66c996143c --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/build.gradle @@ -0,0 +1,14 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_amazon_sqs_singer' +} + +dependencies { + implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs) + implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs) +} diff --git a/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/__init__.py b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/__init__.py new file mode 100644 index 000000000000..46b7376756ec --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/abnormal_state.json new file mode 100644 index 000000000000..810d9404fc2a --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/abnormal_state.json @@ -0,0 +1,5 @@ +{ + "ab-airbyte-testing": { + "todo-field-name": "todo-abnormal-value" + } +} diff --git a/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/acceptance.py new file mode 100644 index 000000000000..a1c52d74fe81 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/acceptance.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """ This fixture is a placeholder for external resources that acceptance test might require.""" + # TODO: setup test dependencies + yield + # TODO: clean up test dependencies diff --git a/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/catalog.json b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/catalog.json new file mode 100644 index 000000000000..88b177da8818 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/catalog.json @@ -0,0 +1,24 @@ +{ + "streams": [ + { + "name": "ab-airbyte-testing", + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false, + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "body": { + "type": "string" + }, + "attributes": { + "type": ["null", "object"] + } + } + } + } + ] +} diff --git a/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/configured_catalog.json new file mode 100644 index 000000000000..ee132a2e53a7 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/configured_catalog.json @@ -0,0 +1,27 @@ +{ + "streams": [ + { + "sync_mode": "full_refresh", + "destination_sync_mode": "append", + "stream": { + "name": "ab-airbyte-testing", + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false, + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "body": { + "type": "string" + }, + "attributes": { + "type": ["null", "object"] + } + } + } + } + } + ] +} diff --git a/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/expected_records.txt b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/expected_records.txt new file mode 100644 index 000000000000..818b6ccde55b --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/expected_records.txt @@ -0,0 +1 @@ +{"type": "RECORD", "record": {"stream": "ab-airbyte-testing", "data": {"id": "ba0f237b-abf5-41ae-9d94-1dbd346f38dd", "body": "test 1", "attributes": null}, "emitted_at": 1633881878000}} diff --git a/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/invalid_config.json new file mode 100644 index 000000000000..87e0a0b097e6 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/invalid_config.json @@ -0,0 +1,11 @@ +{ + "max_batch_size": 50, + "max_wait_time": 20, + "visibility_timeout": 120, + "delete_messages": false, + "attributes_to_return": "path,iter", + "queue_url": "https://sqs.eu-west-1.amazonaws.com/840836244599/ab-airbyte-testing", + "region": "eu-west-1", + "access_key": "xxx", + "secret_key": "xxx" +} diff --git a/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/sample_config.json new file mode 100644 index 000000000000..b951f6e4d9ad --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/sample_config.json @@ -0,0 +1,11 @@ +{ + "max_batch_size": 10, + "max_wait_time": 20, + "visibility_timeout": 21, + "delete_messages": false, + "attributes_to_return": "path,iter", + "queue_url": "https://sqs.eu-west-1.amazonaws.com/840836244599/ab-airbyte-testing", + "region": "eu-west-1", + "access_key": "xxx", + "secret_key": "xxx" +} diff --git a/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/sample_state.json new file mode 100644 index 000000000000..79aa7c0f3191 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/integration_tests/sample_state.json @@ -0,0 +1,5 @@ +{ + "ab-airbyte-testing": { + "todo-field-name": "value" + } +} diff --git a/airbyte-integrations/connectors/source-amazon-sqs/main.py b/airbyte-integrations/connectors/source-amazon-sqs/main.py new file mode 100644 index 000000000000..44544efe727c --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_amazon_sqs import SourceAmazonSqs + +if __name__ == "__main__": + source = SourceAmazonSqs() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-amazon-sqs/requirements.txt b/airbyte-integrations/connectors/source-amazon-sqs/requirements.txt new file mode 100644 index 000000000000..7be17a56d745 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/requirements.txt @@ -0,0 +1,3 @@ +# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies. +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-amazon-sqs/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-amazon-sqs/sample_files/configured_catalog.json new file mode 100755 index 000000000000..ee132a2e53a7 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/sample_files/configured_catalog.json @@ -0,0 +1,27 @@ +{ + "streams": [ + { + "sync_mode": "full_refresh", + "destination_sync_mode": "append", + "stream": { + "name": "ab-airbyte-testing", + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false, + "json_schema": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "body": { + "type": "string" + }, + "attributes": { + "type": ["null", "object"] + } + } + } + } + } + ] +} diff --git a/airbyte-integrations/connectors/source-amazon-sqs/setup.py b/airbyte-integrations/connectors/source-amazon-sqs/setup.py new file mode 100644 index 000000000000..2379bb4c07ce --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/setup.py @@ -0,0 +1,23 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = ["airbyte-cdk", "boto3"] + +TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "moto[sqs, iam]"] + +setup( + name="source_amazon_sqs", + description="Source implementation for Amazon Sqs.", + author="Alasdair Brown", + author_email="airbyte@alasdairb.com", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/__init__.py b/airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/__init__.py new file mode 100644 index 000000000000..9a0d695788b2 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from .source import SourceAmazonSqs + +__all__ = ["SourceAmazonSqs"] diff --git a/airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/source.py b/airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/source.py new file mode 100644 index 000000000000..280f0f9488b1 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/source.py @@ -0,0 +1,165 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import json +from datetime import datetime +from typing import Dict, Generator + +import boto3 +from airbyte_cdk.logger import AirbyteLogger +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + Status, + Type, +) +from airbyte_cdk.sources.source import Source +from botocore.exceptions import ClientError + + +class SourceAmazonSqs(Source): + def delete_message(self, message): + try: + message.delete() + except ClientError: + raise Exception("Couldn't delete message: %s - does your IAM user have sqs:DeleteMessage?", message.message_id) + + def change_message_visibility(self, message, visibility_timeout): + try: + message.change_visibility(VisibilityTimeout=visibility_timeout) + except ClientError: + raise Exception( + "Couldn't change message visibility: %s - does your IAM user have sqs:ChangeMessageVisibility?", message.message_id + ) + + def parse_queue_name(self, url: str) -> str: + return url.rsplit("/", 1)[-1] + + def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: + try: + if "max_batch_size" in config: + # Max batch size must be between 1 and 10 + if config["max_batch_size"] > 10 or config["max_batch_size"] < 1: + raise Exception("max_batch_size must be between 1 and 10") + if "max_wait_time" in config: + # Max wait time must be between 1 and 20 + if config["max_wait_time"] > 20 or config["max_wait_time"] < 1: + raise Exception("max_wait_time must be between 1 and 20") + + # Required propeties + queue_url = config["queue_url"] + logger.debug("Amazon SQS Source Config Check - queue_url: " + queue_url) + queue_region = config["region"] + logger.debug("Amazon SQS Source Config Check - region: " + queue_region) + # Senstive Properties + access_key = config["access_key"] + logger.debug("Amazon SQS Source Config Check - access_key (ends with): " + access_key[-1]) + secret_key = config["secret_key"] + logger.debug("Amazon SQS Source Config Check - secret_key (ends with): " + secret_key[-1]) + + logger.debug("Amazon SQS Source Config Check - Starting connection test ---") + session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=queue_region) + sqs = session.resource("sqs") + queue = sqs.Queue(url=queue_url) + if hasattr(queue, "attributes"): + logger.debug("Amazon SQS Source Config Check - Connection test successful ---") + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + else: + return AirbyteConnectionStatus(status=Status.FAILED, message="Amazon SQS Source Config Check - Could not connect to queue") + except ClientError as e: + return AirbyteConnectionStatus(status=Status.FAILED, message=f"Amazon SQS Source Config Check - Error in AWS Client: {str(e)}") + except Exception as e: + return AirbyteConnectionStatus( + status=Status.FAILED, message=f"Amazon SQS Source Config Check - An exception occurred: {str(e)}" + ) + + def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: + streams = [] + + # Get the queue name by getting substring after last / + stream_name = self.parse_queue_name(config["queue_url"]) + logger.debug("Amazon SQS Source Stream Discovery - stream is: " + stream_name) + + json_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {"id": {"type": "string"}, "body": {"type": "string"}, "attributes": {"type": ["object", "null"]}}, + } + streams.append(AirbyteStream(name=stream_name, json_schema=json_schema, supported_sync_modes=["full_refresh"])) + return AirbyteCatalog(streams=streams) + + def read( + self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any] + ) -> Generator[AirbyteMessage, None, None]: + stream_name = self.parse_queue_name(config["queue_url"]) + logger.debug("Amazon SQS Source Read - stream is: " + stream_name) + + # Required propeties + queue_url = config["queue_url"] + queue_region = config["region"] + delete_messages = config["delete_messages"] + + # Optional Properties + max_batch_size = config.get("max_batch_size", 10) + max_wait_time = config.get("max_wait_time", 20) + visibility_timeout = config.get("visibility_timeout") + attributes_to_return = config.get("attributes_to_return") + if attributes_to_return is None: + attributes_to_return = ["All"] + else: + attributes_to_return = attributes_to_return.split(",") + + # Senstive Properties + access_key = config["access_key"] + secret_key = config["secret_key"] + + logger.debug("Amazon SQS Source Read - Creating SQS connection ---") + session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=queue_region) + sqs = session.resource("sqs") + queue = sqs.Queue(url=queue_url) + logger.debug("Amazon SQS Source Read - Connected to SQS Queue ---") + timed_out = False + while not timed_out: + try: + logger.debug("Amazon SQS Source Read - Beginning message poll ---") + messages = queue.receive_messages( + MessageAttributeNames=attributes_to_return, MaxNumberOfMessages=max_batch_size, WaitTimeSeconds=max_wait_time + ) + + if not messages: + logger.debug("Amazon SQS Source Read - No messages recieved during poll, time out reached ---") + timed_out = True + break + + for msg in messages: + logger.debug("Amazon SQS Source Read - Message recieved: " + msg.message_id) + if visibility_timeout: + logger.debug("Amazon SQS Source Read - Setting message visibility timeout: " + msg.message_id) + self.change_message_visibility(msg, visibility_timeout) + logger.debug("Amazon SQS Source Read - Message visibility timeout set: " + msg.message_id) + + data = { + "id": msg.message_id, + "body": msg.body, + "attributes": msg.message_attributes, + } + + # TODO: Support a 'BATCH OUTPUT' mode that outputs the full batch in a single AirbyteRecordMessage + yield AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=int(datetime.now().timestamp()) * 1000), + ) + if delete_messages: + logger.debug("Amazon SQS Source Read - Deleting message: " + msg.message_id) + self.delete_message(msg) + logger.debug("Amazon SQS Source Read - Message deleted: " + msg.message_id) + # TODO: Delete messages in batches to reduce amount of requests? + + except ClientError as error: + raise Exception("Error in AWS Client: " + str(error)) diff --git a/airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/spec.json b/airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/spec.json new file mode 100644 index 000000000000..3bddbaf43236 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/source_amazon_sqs/spec.json @@ -0,0 +1,105 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/sources/amazon-sqs", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Amazon SQS Source Spec", + "type": "object", + "required": ["queue_url", "region", "delete_messages"], + "additionalProperties": false, + "properties": { + "queue_url": { + "title": "Queue URL", + "description": "URL of the SQS Queue", + "type": "string", + "examples": [ + "https://sqs.eu-west-1.amazonaws.com/1234567890/my-example-queue" + ], + "order": 0 + }, + "region": { + "title": "AWS Region", + "description": "AWS Region of the SQS Queue", + "type": "string", + "enum": [ + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "cn-north-1", + "cn-northwest-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "me-south-1", + "us-gov-east-1", + "us-gov-west-1" + ], + "order": 1 + }, + "delete_messages": { + "title": "Delete Messages After Read", + "description": "If Enabled, messages will be deleted from the SQS Queue after being read. If Disabled, messages are left in the queue and can be read more than once. WARNING: Enabling this option can result in data loss in cases of failure, use with caution, see documentation for more detail. ", + "type": "boolean", + "default": false, + "order": 2 + }, + "max_batch_size": { + "title": "Max Batch Size", + "description": "Max amount of messages to get in one batch (10 max)", + "type": "integer", + "examples": ["5"], + "order": 3 + }, + "max_wait_time": { + "title": "Max Wait Time", + "description": "Max amount of time in seconds to wait for messages in a single poll (20 max)", + "type": "integer", + "examples": ["5"], + "order": 4 + }, + "attributes_to_return": { + "title": "Message Attributes To Return", + "description": "Comma separated list of Mesage Attribute names to return", + "type": "string", + "examples": ["attr1,attr2"], + "order": 5 + }, + "visibility_timeout": { + "title": "Message Visibility Timeout", + "description": "Modify the Visibility Timeout of the individual message from the Queue's default (seconds).", + "type": "integer", + "examples": ["15"], + "order": 6 + }, + "access_key": { + "title": "AWS IAM Access Key ID", + "description": "The Access Key ID of the AWS IAM Role to use for pulling messages", + "type": "string", + "examples": ["xxxxxHRNxxx3TBxxxxxx"], + "airbyte_secret": true, + "order": 7 + }, + "secret_key": { + "title": "AWS IAM Secret Key", + "description": "The Secret Key of the AWS IAM Role to use for pulling messages", + "type": "string", + "examples": ["hu+qE5exxxxT6o/ZrKsxxxxxxBhxxXLexxxxxVKz"], + "airbyte_secret": true, + "order": 8 + } + } + } +} diff --git a/airbyte-integrations/connectors/source-amazon-sqs/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-amazon-sqs/unit_tests/unit_test.py new file mode 100644 index 000000000000..fdaeed172a07 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-sqs/unit_tests/unit_test.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import json +from typing import Any, Dict, Mapping + +import boto3 +from airbyte_cdk.logger import AirbyteLogger +from airbyte_cdk.models import ConfiguredAirbyteCatalog, Status + +# from airbyte_cdk.sources.source import Source +from moto import mock_iam, mock_sqs +from moto.core import set_initial_no_auth_action_count +from source_amazon_sqs import SourceAmazonSqs + + +@mock_iam +def create_user_with_all_permissions(): + client = boto3.client("iam", region_name="eu-west-1") + client.create_user(UserName="test_user1") + + policy_document = { + "Version": "2012-10-17", + "Statement": [{"Effect": "Allow", "Action": ["sqs:*"], "Resource": "*"}], + } + + client.put_user_policy( + UserName="test_user1", + PolicyName="policy1", + PolicyDocument=json.dumps(policy_document), + ) + + return client.create_access_key(UserName="test_user1")["AccessKey"] + + +def create_config(queue_url, access_key, secret_key, queue_region, delete_message): + return { + "delete_messages": delete_message, + "queue_url": queue_url, + "region": queue_region, + "access_key": access_key, + "secret_key": secret_key, + "max_wait_time": 5, + "visibility_timeout": 120, + } + + +def get_catalog() -> Mapping[str, Any]: + with open("sample_files/configured_catalog.json", "r") as f: + return json.load(f) + + +@set_initial_no_auth_action_count(3) +@mock_sqs +@mock_iam +def test_check(): + # Create User + user = create_user_with_all_permissions() + # Create Queue + queue_name = "amazon-sqs-mock-queue" + queue_region = "eu-west-1" + client = boto3.client( + "sqs", aws_access_key_id=user["AccessKeyId"], aws_secret_access_key=user["SecretAccessKey"], region_name=queue_region + ) + queue_url = client.create_queue(QueueName=queue_name)["QueueUrl"] + # Create config + config = create_config(queue_url, user["AccessKeyId"], user["SecretAccessKey"], queue_region, False) + # Create AirbyteLogger + logger = AirbyteLogger() + # Create Source + source = SourceAmazonSqs() + # Run check + status = source.check(logger, config) + assert status.status == Status.SUCCEEDED + + +@mock_sqs +def test_discover(): + # Create Queue + queue_name = "amazon-sqs-mock-queue" + queue_region = "eu-west-1" + client = boto3.client("sqs", region_name=queue_region) + queue_url = client.create_queue(QueueName=queue_name)["QueueUrl"] + # Create config + config = create_config(queue_url, "xxx", "xxx", queue_region, False) + # Create AirbyteLogger + logger = AirbyteLogger() + # Create Source + source = SourceAmazonSqs() + # Run discover + catalog = source.discover(logger, config) + assert catalog.streams[0].name == queue_name + + +@set_initial_no_auth_action_count(3) +@mock_sqs +@mock_iam +def test_read(): + # Create User + user = create_user_with_all_permissions() + # Create Queue + queue_name = "amazon-sqs-mock-queue" + queue_region = "eu-west-1" + client = boto3.client( + "sqs", aws_access_key_id=user["AccessKeyId"], aws_secret_access_key=user["SecretAccessKey"], region_name=queue_region + ) + + queue_url = client.create_queue(QueueName=queue_name)["QueueUrl"] + # Create config + config = create_config(queue_url, user["AccessKeyId"], user["SecretAccessKey"], queue_region, False) + # Create ConfiguredAirbyteCatalog + catalog = ConfiguredAirbyteCatalog(streams=get_catalog()["streams"]) + # Create AirbyteLogger + logger = AirbyteLogger() + # Create State + state = Dict[str, any] + # Create Source + source = SourceAmazonSqs() + # Send test message + test_message = "UNIT_TEST_MESSAGE" + client.send_message(QueueUrl=queue_url, MessageBody=test_message) + # Run read + for message in source.read(logger, config, catalog, state): + record = message.record + stream = record.stream + assert stream == queue_name + data = record.data + data_body = data["body"] + assert data_body == test_message diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 0f6a73a43503..0c9eb152fb68 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -29,6 +29,7 @@ * [Scaling Airbyte](operator-guides/scaling-airbyte.md) * [Connector Catalog](integrations/README.md) * [Sources](integrations/sources/README.md) + * [Amazon SQS](integrations/sources/amazon-sqs.md) * [Amazon Seller Partner](integrations/sources/amazon-seller-partner.md) * [Amazon Ads](integrations/sources/amazon-ads.md) * [Amplitude](integrations/sources/amplitude.md) diff --git a/docs/integrations/README.md b/docs/integrations/README.md index 715d13a997c0..ce62c2badd75 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -14,6 +14,7 @@ Airbyte uses a grading system for connectors to help users understand what to ex | Connector | Grade | | :--- | :--- | +| [Amazon SQS](sources/amazon-sqs.md) | Alpha | | [Amazon Seller Partner](sources/amazon-seller-partner.md) | Alpha | | [Amplitude](sources/amplitude.md) | Beta | | [Apify Dataset](sources/apify-dataset.md) | Alpha | diff --git a/docs/integrations/sources/amazon-sqs.md b/docs/integrations/sources/amazon-sqs.md new file mode 100644 index 000000000000..afce915d823f --- /dev/null +++ b/docs/integrations/sources/amazon-sqs.md @@ -0,0 +1,88 @@ +# Amazon SQS + +## Sync overview + +This source will sync messages from an [SQS Queue](https://docs.aws.amazon.com/sqs/index.html). + +### Output schema + +This source will output one stream for the configured SQS Queue. +The stream record data will have three fields: +* id (a UUIDv4 as a STRING) +* body (message body as a STRING) +* attributes (attributes of the messages as an OBJECT or NULL) + +### Features + +| Feature | Supported?\(Yes/No\) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | yes | | +| Incremental Sync | no | | +| Namespaces | no | | + +### Performance considerations + +## Getting started + +### Requirements + +* AWS IAM Access Key +* AWS IAM Secret Key +* AWS SQS Queue + +### Properties + +Required properties are 'Queue URL', 'AWS Region' and 'Delete Messages After Read' as noted in **bold** below. + +* **Queue URL** (STRING) + * The full AWS endpoint URL of the queue e.g. https://sqs.eu-west-1.amazonaws.com/1234567890/example-queue-url +* **AWS Region** (STRING) + * The region code for the SQS Queue e.g. eu-west-1 +* **Delete Messages After Read** (BOOLEAN) + * **WARNING:** Setting this option to TRUE can result in data loss, do not enable this option unless you understand the risk. See the **Data loss warning** section below. + * Should the message be deleted from the SQS Queue after being read? This prevents the message being read more than once + * By default messages are NOT deleted, thus can be re-read after the `Message Visibility Timeout` + * Default: False +* Max Batch Size (INTEGER) + * The max amount of messages to consume in a single poll e.g. 5 + * Minimum of 1, maximum of 10 + * Default: 10 +* Max Wait Time (INTEGER) + * The max amount of time (in seconds) to poll for messages before commiting a batch (or timing out) unless we fill a batch (as per `Max Batch Size`) + * Minimum of 1, maximum of 20 + * Default: 20 +* Message Attributes To Return (STRING) + * A comma separated list of Attributes to return for each message + * Default: All +* Message Visibility Timeout (INTEGER) + * After a message is read, how much time (in seconds) should the message be hidden from other consumers + * After this timeout, the message is not deleted and can be re-read + * Default: 30 +* AWS IAM Access Key ID (STRING) + * The Access Key for the IAM User with permissions on this Queue + * If `Delete Messages After Read` is `false` then only `sqs:ReceiveMessage` + * If `Delete Messages After Read` is `true` then `sqs:DeleteMessage` is also needed +* AWS IAM Secret Key (STRING) + * The Secret Key for the IAM User with permissions on this Queue + +### Data loss warning + +When enabling **Delete Messages After Read**, the Source will delete messages from the SQS Queue after reading them. The message is deleted *after* the configured Destination takes the message from this Source, but makes no guarentee that the downstream destination has commited/persisted the message. This means that it is possible for the Airbyte Destination to read the message from the Source, the Source deletes the message, then the downstream application fails - resulting in the message being lost permanently. + +Extra care should be taken to understand this risk before enabling this option. + +### Setup guide + +* [Create IAM Keys](https://aws.amazon.com/premiumsupport/knowledge-center/create-access-key/) +* [Create SQS Queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-getting-started.html#step-create-queue) + +> **NOTE**: +> * If `Delete Messages After Read` is `false` then the IAM User needs only `sqs:ReceiveMessage` in the AWS IAM Policy +> * If `Delete Messages After Read` is `true` then both `sqs:ReceiveMessage` and `sqs:DeleteMessage` are needed in the AWS IAM Policy + +## CHANGELOG + +| Version | Date | Pull Request | Subject | +| :--- | :--- | :--- | :--- | +| `0.1.0` | 2021-10-10 | [\#0000](https://github.com/airbytehq/airbyte/pull/0000) | `Initial version` | +