From ac84381521088426f8fc65e3f6d622ffb19f4bc0 Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Wed, 22 Sep 2021 15:24:49 +0300 Subject: [PATCH 1/3] S3 source: add custom endpoint --- .../69589781-7828-43c5-9f63-8925b1c1ccc2.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../source_acceptance_test/plugin.py | 3 + .../connectors/source-s3/Dockerfile | 2 +- .../source-s3/acceptance-test-config.yml | 35 ++++++-- .../source-s3/integration_tests/acceptance.py | 25 ++++++ .../integration_tests/config_minio.json | 16 ++++ .../expected_records_custom_server.txt | 55 +++++++++++++ .../integration_tests/minio_data.zip | Bin 0 -> 13376 bytes .../source-s3/integration_tests/spec.json | 22 ++++- .../connectors/source-s3/source_s3/s3file.py | 21 +++-- .../connectors/source-s3/source_s3/source.py | 6 +- .../source_files_abstract/__init__.py | 75 +++++++++++++++++- .../source_s3/source_files_abstract/source.py | 2 +- .../source_s3/source_files_abstract/spec.py | 12 +-- .../source_s3/source_files_abstract/stream.py | 7 +- .../connectors/source-s3/source_s3/stream.py | 30 +++---- docs/integrations/sources/s3.md | 6 +- 18 files changed, 263 insertions(+), 58 deletions(-) create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/minio_data.zip diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json index a682a2d9ca6f..f8417079b2c0 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/69589781-7828-43c5-9f63-8925b1c1ccc2.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2", "name": "S3", "dockerRepository": "airbyte/source-s3", - "dockerImageTag": "0.1.4", + "dockerImageTag": "0.1.5", "documentationUrl": "https://docs.airbyte.io/integrations/sources/s3" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 5321c4e0d4c3..22d24805664e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -85,7 +85,7 @@ - sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 name: S3 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 documentationUrl: https://docs.airbyte.io/integrations/sources/s3 sourceType: file - sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87 diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/plugin.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/plugin.py index c414d3497428..ca7fdd2eb3cb 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/plugin.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/plugin.py @@ -100,6 +100,9 @@ def pytest_collection_modifyitems(config, items): i += len(inner_items) for items in packed_items: + if not hasattr(items[0].cls, "config_key"): + # Skip user defined test classes from integration_tests/ directory. + continue test_configs = getattr(config.tests, items[0].cls.config_key()) for test_config, item in zip(test_configs, items): default_timeout = item.get_closest_marker("default_timeout") diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index fc6a74b4e12f..0dc3dde2ff6e 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,7 +17,7 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml index 9c32b9f7015a..61817bd04ce9 100644 --- a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml @@ -6,18 +6,21 @@ tests: - spec_path: "integration_tests/spec.json" connection: # for CSV format - - config_path: "secrets/config.json" + - config_path: "integration_tests/config_minio.json" status: "succeed" # for Parquet format - config_path: "secrets/parquet_config.json" status: "succeed" - - config_path: "integration_tests/invalid_config.json" - status: "failed" + # for custom server + - config_path: "integration_tests/config_minio.json" + status: "succeed" discovery: - # for CSV format + # for CSV format - config_path: "secrets/config.json" # for Parquet format - config_path: "secrets/parquet_config.json" + # for custom server + - config_path: "integration_tests/config_minio.json" basic_read: # for CSV format - config_path: "secrets/config.json" @@ -29,17 +32,32 @@ tests: configured_catalog_path: "integration_tests/parquet_configured_catalog.json" expect_records: path: "integration_tests/parquet_expected_records.txt" - incremental: + # for custom server + - config_path: "integration_tests/config_minio.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + # expected records contains _ab_source_file_last_modified property which + # is modified all the time s3 file changed and for custom server it is + # file creating date and it always new. Uncomment this line when SAT + # would have ability to ignore specific fields from expected records. + # expect_records: + # path: "integration_tests/expected_records_custom_server.txt.txt" + incremental: # for CSV format - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" - cursor_paths: + cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" # for Parquet format - config_path: "secrets/parquet_config.json" configured_catalog_path: "integration_tests/parquet_configured_catalog.json" - cursor_paths: + cursor_paths: + test: ["_ab_source_file_last_modified"] + future_state_path: "integration_tests/abnormal_state.json" + # for custom server + - config_path: "integration_tests/config_minio.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" @@ -50,3 +68,6 @@ tests: # for Parquet format - config_path: "secrets/parquet_config.json" configured_catalog_path: "integration_tests/parquet_configured_catalog.json" + # for custom server + - config_path: "integration_tests/config_minio.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py index d6cbdc97c495..a9c2d0e6b0b3 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py +++ b/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py @@ -23,6 +23,11 @@ # +import shutil +import tempfile +from zipfile import ZipFile + +import docker import pytest pytest_plugins = ("source_acceptance_test.plugin",) @@ -32,3 +37,23 @@ def connector_setup(): """ This fixture is a placeholder for external resources that acceptance test might require.""" yield + + +@pytest.fixture(scope="session", autouse=True) +def minio_setup(): + """ This fixture is a placeholder for external resources that acceptance test might require.""" + client = docker.from_env() + tmp_dir = tempfile.mkdtemp() + with ZipFile("./integration_tests/minio_data.zip") as archive: + archive.extractall(tmp_dir) + + container = client.containers.run( + "minio/minio", + f"server {tmp_dir}/minio_data", + network_mode="host", + volumes=["/tmp:/tmp", "/var/run/docker.sock:/var/run/docker.sock"], + detach=True, + ) + yield + shutil.rmtree(tmp_dir) + container.stop() diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json new file mode 100644 index 000000000000..fa6158eeb40e --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -0,0 +1,16 @@ +{ + "dataset": "test", + "provider": { + "storage": "S3", + "bucket": "test-bucket", + "aws_access_key_id": "123456", + "aws_secret_access_key": "123456key", + "path_prefix": "", + "endpoint": "http://localhost:9000" + }, + "format": { + "filetype": "csv" + }, + "path_pattern": "*.csv", + "schema": "{}" +} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt new file mode 100644 index 000000000000..ef52db86be11 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt @@ -0,0 +1,55 @@ +{"stream": "test", "data": {"Year": 1960, "Value": 59184116488.9977, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1961, "Value": 49557050182.9631, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1962, "Value": 46685178504.3274, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1963, "Value": 50097303271.0232, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1964, "Value": 59062254890.1871, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1965, "Value": 69709153115.3147, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1966, "Value": 75879434776.1831, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1967, "Value": 72057028559.6741, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1968, "Value": 69993497892.3132, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1969, "Value": 78718820477.9257, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1970, "Value": 91506211306.3745, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1971, "Value": 98562023844.1813, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1972, "Value": 112159813640.376, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1973, "Value": 136769878359.668, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1974, "Value": 142254742077.706, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1975, "Value": 161162492226.686, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1976, "Value": 151627687364.405, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1977, "Value": 172349014326.931, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1978, "Value": 148382111520.192, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1979, "Value": 176856525405.729, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1980, "Value": 189649992463.987, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1981, "Value": 194369049090.197, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1982, "Value": 203549627211.606, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1983, "Value": 228950200773.115, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1984, "Value": 258082147252.256, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1985, "Value": 307479585852.339, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1986, "Value": 298805792971.544, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1987, "Value": 271349773463.863, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1988, "Value": 310722213686.031, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1989, "Value": 345957485871.286, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1990, "Value": 358973230048.399, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1991, "Value": 381454703832.753, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1992, "Value": 424934065934.066, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1993, "Value": 442874596387.119, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1994, "Value": 562261129868.774, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1995, "Value": 732032045217.766, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1996, "Value": 860844098049.121, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1997, "Value": 958159424835.34, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1998, "Value": 1025276902078.73, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 1999, "Value": 1089447108705.89, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2000, "Value": 1205260678391.96, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2001, "Value": 1332234719889.82, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2002, "Value": 1461906487857.92, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2003, "Value": 1649928718134.59, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2004, "Value": 1941745602165.09, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2005, "Value": 2268598904116.28, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2006, "Value": 2729784031906.09, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2007, "Value": 3523094314820.9, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2008, "Value": 4558431073438.2, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2009, "Value": 5059419738267.41, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2010, "Value": 6039658508485.59, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2011, "Value": 7492432097810.11, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2012, "Value": 8461623162714.07, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2013, "Value": 9490602600148.49, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} +{"stream": "test", "data": {"Year": 2014, "Value": 10354831729340.4, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-09-23T11:48:44+0000", "_ab_source_file_url": "china_gdp.csv"}, "emitted_at": 1632398440000} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/minio_data.zip b/airbyte-integrations/connectors/source-s3/integration_tests/minio_data.zip new file mode 100644 index 0000000000000000000000000000000000000000..74f6ba2389448f0c1f3e9f359cb74f95c1241bce GIT binary patch literal 13376 zcmb_j2{_bU7a!YX&6=gMFIk7|dnU5e7|N2ej-9~_DqFG_StC**q$#_!AR&<@6-AMv zvh;<@i%_p`CYAZiF!avzT*LG9oZmUWd(OG{o^yg3kdae^a3AR?1=AnD{QN-!+6nS> z^>RhpING7@BoCNTfygE!4efBt?YDpg7 zX+t~QA}CKj2q=$-pgd1M545YdoiBPxvM}1vP7b@4B;!mX$Nj)*%7Xo-1mQF-K3J1) ziTw+OUAzP(J0X2N?a&Z66w>Pl2?UhBHVlWsL=7ZFAGLRymcwDvW=~0C^WL+)7fL%p z{`3?tUxo+2O*pc1z31%E_;b zv*e{XLoUxV0!TMtUPXTpf-%+OJFG>Y*dGA&iZ> zMg{-7;VVi1Y9*iNS^gpD)7BF~HXqjo@1|Fl0AG0}73j z*eDgPZB_q>VQIiEqfMe&NtqusGM3fVfy(cO>Pjif?lsf<8*;E|42ASSKpnBi(c1h7 z5~DGemQ8x0GO^-UV_;p(5xvGzWlzS(j$1{r!ky|?J*6nSGq2FIIZ z5v$Dpv)$&~OAbNNH&D_y`${Ji&K)^(y?#7Efr%V;5#&dTB;$ajR!6EDEw+8i8Ow%@ zr5jXF8#pn|+mp_?u#Pjwl^id z9b&W{HhfKoiZoxdQEg3<7Vutp*Ks&^F~4rNJ!WU~KsUrKO=DUsyS3}FaK+Kkaxrhy zooM%Cqug0BEK0IM&Mp3GG*dA(q%x!1D_=1n!ds0m?cvaylf5B+TW4f|#QD<48Uv4V z9gER~-u;sm=6U(7=h-_$JCQzt+aK`9mdxAf^v9G%X33N7DLwT}k?p)d@g8mN%lntF zH@JgC?Zcf^2n7g~CVB_uWY{!dfon7mC4%4AYDW|gPxcSf*X8zum$Zb zYB@sT;yK%(d_cIXYhKo;rWMRmANKi%A8+m35$UVf&(hoXSoD~w`EX|?+u7GyiBUl< z`DdPcKAHR1*ruTUD<(bs={dn?i~6DgH%(Il2VOU+sT7s`bMufbl~KXT-o1Atm9(e_Zhesw%}_t0D+nUOHZZuZG~RKkq4(%?-DYr?46Ksj2so0&9v)}X;F z;r!E=;}l`hLf49#djWInDBV(3)6 zTc;Cd0=669KT0xRnZg>8rI;1gsg`I>;UyMR9rz{mkw_{=SV@N$EDqMqptL!305jQ9 zYf7Ry=3$yjg?xeWA64*p8>M}xYHMR@2KgLscI=k1xAvS{?QT4;vM9@c$E%uiKs{#RvHY8t zhr}M93x%E1DonfbUR&PpoXK@z9`(#J5na6p-1`pX$CW1@U=|ops$y@p-|y}%L@Rwh z;8PNU-N%yb2`$Mk^DSUM3+b58e4^8N$nRD8+z@SJlXLYj!IKv&nzSjL!_)6ILJp)f zxZl5HlbKh-8hQ4Dq`g^m?qli;P_+PC44<$LRg1I&vcv)v(@vT2#!*E6P14{eDc=Kgj*;!v1=Ed80cN7+CPfwIwK?vKhMs(U#uy9(x=+#JPgpE5{PQ-!dnNSFY+j1rpmp>?rx4qumn9Y-wEO_OCacYly5~`>L z=5Q}Az^&WtOZ7oBe^}=ggMm9QT;3ZrcpL7nDGUBKF8QmcGiP3+cY~tsX3M`$Em9K+IGi29?t~x!2=!?b}@D z>}!W9+3rs<7Sdk&?A;kH#66|*KaQCJ_#Q!?0=g&f3!0%`ne2I{V@~n0(|t--Y`OJR z^fyBk`g|`A4ppF_!&hBt%p}-ot>eyoE<46w)m#?LxJ%)T(B4bC#yxo2&saa=a4?aH zzoVj#tiH8~WOlr50?feehx#Vtvnx1GxIcbtfnavOd9EkwCphreh+l36mzO6kP&V7a>)^v|JT zsfR_pk4U;~FS;a{8DF1RqoPC>zyI*!a9>C0@{45U!FvRYSK>VFGdp83G@k7};BP1N`99$3(dsnZed(s+AE*AtwuOK~+V=t;{ z*roTMU^}6fk5mFG2*yFThV!s!&1Maal*BW#kR!wAx52x$Do<~pIN}m8HQ+dG2l`qd zGw;w$lGtM;Q7Y>1D_c8Mi}((DS;}>#&Ki23+M8CV_ zTUes#uE)~NpfwrPp-W7H7V$tV3y1mRsLH{+|Xaou^u@qkGvc2>MUYUc8H^VkQ>|aEhh~bp*uSh%a zpX))C8?-G&T~JfU;T>e^palgNsICdq6NuZ_*&{DsWyxgR<^bItPttt(nMBxXz`xh5 z`e=2{mm5iUNsO$!7>#Wh%Sf^ua+QO`sf<4w&B!x?OYu$@=y2sO4VGI&8H(qH(l zu#&6)TX(Brze`cK)O4+;@bywj!chm^3=t4}4P|*XjL|H?61-FtQ~3o zSXo6QBCSmwq1-)4gJXR;(dW~>-yZ!l=@72n{W`o>`uSStro*#d%wl?EeNtr3u~_FL zRv~Rnb@%(bBj?4w!^QNo#8Oajj)QXotmzS~c>-y9TqAY}&`GUJ?FypFeu!(~6*4rc zaAuVB)Khv1KV!p3x}H{tldY_C&zogtU9gf4ZrGQm85h^Z!4AE{zICTn;htc=a?)vP4RdP|tYm2e1>Gv@O;_=VFQVeu0IX;Gii>sHNjkBXS!~x~M?kThEZ^JW`VF}pqs==;3e}PCuLg9>W zV+!Y?-8UOTk#gh4sWkO}!7eOUC^=(|4>gA(z0(6H_z&@tUst@c8+??~1#>4E=H$lt z(%eeu!FkYlv|_@$a&ln`cJ3Ekp)J`?<7nS=77W!%in$&dW*PTA5%gc<)6Ji^2f zGV0NFyvE^DqvmkAd4c&W)7c`~qqP=ScxL*!)RatL7fp>2V<+EyRLBu4r#t4Qb*~wtibCcDGh8zs z#tLT49b<^4w7JAjdp=0I3!Z|@TS?>!3%d~cLg^>sv{>n|oi6Z)N9_hWtbQE^5RABn zjK8L?{c&YX!*p2woM3E=*Gr~UtHkiJym4dOgeU>l{3~fJ{O-N+`yMFV5Mbn^S<#I9 z=p#ZNh~+9Ay9n1sSBr#Aq-Q%p8OL|M&p()| zXf8C8SAPAAo;ZX`LRU{*Pwi1-D~LQ+Jp-A#-#D{YSy83p9>3h~T{4p2AHzrPzl-bE zcGN7`f2+keCdo&cj9-_nF-+6ou=MGQ)4SNq>rRubFqiI0g9NrVhwVgyKoaY^#)?CJ zO@~07Sn6}G-uPexV&O;o52*yYL(6`Rt1MoZw+izn1%Vo{=ZOGefq?X}mUIa-JGHJIwLq?EOmNTnxfB*eBzfu;U#h#i_&8zmb!cetSQ@O@xbR^6C$HHY z{jv-R5#^Fu0M9|V-z)W4PPlYR!Go%(HpX@aPgjpQw-jsT%guJEFJOhvQ!DGYqlxU44L`60i8$~_uzU}jO`h(-`9iG8Nl z>)N}^B+r_f<5dy@lJtZpc+Va!?{@G$RT?%JGa~CBNZTU7M3o}tPdD2(DU`$9AeL0% zJ=F-46{L*O_+X(ezt!_3xh~alK8BA8pem}$y@uI{@L4)=pQC*4=V;BL7m~tseGG2l z6uRA_#{Yx$4tr5*U4pkcMDrz^oD!qHFDdyP`>S*H;Q?Nmk+<$kgYD`+IusoA%y zqk_JZ!3;=9xyV)z5E>9WHY9N$2GDmX?i(-#L9sLi@yEo%vcQ$kELMQPeL#VLY7AC3 zu12ti>_6urmX#fco+T(7e>wt-f}4)`V=@9~!SWFfoQ+sQ!JcBE%CJd-D1gTO`o`cg z30Btc4<~@A%UCo#=JAO7*|9iaw*FqMgy020%&!NrH0VGa3~@T|jhP4xe_^Xq@s)W*wGQuq-ue2}ue=~|0jNJhPYCVT+vi6%S?8j# z5<>)lQME1x-f_DbxZMCa4utF=cSvpEqZoV3TmhYQa=T~oMEftm-0d{uq89_5`nvo?H{xdS% znKz*c@X@>$8LRJyF!!C datetime: @@ -79,9 +77,9 @@ def last_modified(self) -> datetime: if self.use_aws_account(self._provider): raise nce else: - return boto3.client("s3", config=ClientConfig(signature_version=UNSIGNED)).head_object(Bucket=bucket, Key=self.url)[ - "LastModified" - ] + return make_s3_client(self._provider, config=ClientConfig(signature_version=UNSIGNED)).head_object( + Bucket=bucket, Key=self.url + )["LastModified"] @staticmethod def use_aws_account(provider: dict) -> bool: @@ -101,12 +99,11 @@ def open(self, binary: bool) -> Iterator[Union[TextIO, BinaryIO]]: bucket = self._provider.get("bucket") if self.use_aws_account(self._provider): - aws_access_key_id = self._provider.get("aws_access_key_id", "") - aws_secret_access_key = self._provider.get("aws_secret_access_key", "") - result = smart_open.open(f"s3://{aws_access_key_id}:{aws_secret_access_key}@{bucket}/{self.url}", mode=mode) + params = {"client": make_s3_client(self._provider, session=self._boto_session)} + result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode) else: config = ClientConfig(signature_version=UNSIGNED) - params = {"client": boto3.client("s3", config=config)} + params = {"client": make_s3_client(self._provider, config=config)} result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode) # see https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager for why we do this diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source.py index b4c8ea728769..c0fd5207dfc5 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source.py @@ -56,7 +56,11 @@ class Config: description="By providing a path-like prefix (e.g. myFolder/thisTable/) under which all the relevant files sit, we can optimise finding these in S3. This is optional but recommended if your bucket contains many folders/files.", ) - provider: S3Provider = Field(...) + endpoint: str = Field("", description="Endpoint to S3 compatable service, leave empty for default Amazon servers") + use_ssl: bool = Field(default=None, description="Is remove server using secure SSL/TLS connection") + verify_ssl_cert: bool = Field(default=None, description="Allow self signed certificates") + + provider: S3Provider class SourceS3(SourceFilesAbstract): diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py index c52966eb454b..4e88ebd87316 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py @@ -1,3 +1,26 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# """ MIT License @@ -22,6 +45,56 @@ SOFTWARE. """ +import boto3.session +from botocore.client import Config + from .source import SourceFilesAbstract -__all__ = ["SourceFilesAbstract"] + +def make_s3_resource(provider: dict, session: boto3.session.Session, config: Config = None) -> object: + """ + Construct boto3 resource with specified config and remote endpoint + :param provider provider configuration from connector configuration. + :param session User session to create client from. + :param config Client config parameter in case of using creds from .aws/config file. + :return Boto3 S3 resource instance. + """ + client_kv_args = _get_s3_client_args(provider, config) + return session.resource("s3", **client_kv_args) + + +def make_s3_client(provider: dict, session: boto3.session.Session = None, config: Config = None) -> object: + """ + Construct boto3 client with specified config and remote endpoint + :param provider provider configuration from connector configuration. + :param session User session to create client from. Default boto3 sesion in case of session not specified. + :param config Client config parameter in case of using creds from .aws/config file. + :return Boto3 S3 client instance. + """ + client_kv_args = _get_s3_client_args(provider, config) + if session is None: + return boto3.client("s3", **client_kv_args) + else: + return session.client("s3", **client_kv_args) + + +def _get_s3_client_args(provider: dict, config: Config) -> dict: + """ + Returns map of args used for creating s3 boto3 client. + :param provider provider configuration from connector configuration. + :param config Client config parameter in case of using creds from .aws/config file. + :return map of s3 client arguments. + """ + client_kv_args = {"config": config} + endpoint = provider.get("endpoint") + if endpoint: + # endpoint could be None or empty string, set to default Amazon endpoint in + # this case. + client_kv_args["endpoint_url"] = endpoint + client_kv_args["use_ssl"] = provider.get("use_ssl") + client_kv_args["verify"] = provider.get("verify_ssl_cert") + + return client_kv_args + + +__all__ = ["SourceFilesAbstract", "make_s3_client", "make_s3_resource"] diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py index e65915b6af9d..784d639d64fb 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py @@ -81,7 +81,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> found_a_file = False try: - for filepath in self.stream_class.filepath_iterator(logger, config.get("provider")): + for filepath in self.stream_class(**config).filepath_iterator(): found_a_file = True # TODO: will need to split config.get("path_pattern") up by stream once supporting multiple streams # test that matching on the pattern doesn't error diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py index abe4029f408b..df17391f605a 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py @@ -25,7 +25,6 @@ import json import re -from copy import deepcopy from typing import Union from jsonschema import RefResolver @@ -85,11 +84,12 @@ class SourceFilesAbstractSpec(BaseModel): @staticmethod def change_format_to_oneOf(schema: dict) -> dict: - schema["properties"]["format"]["type"] = "object" - if "oneOf" in schema["properties"]["format"]: - return schema - schema["properties"]["format"]["oneOf"] = deepcopy(schema["properties"]["format"]["anyOf"]) - del schema["properties"]["format"]["anyOf"] + props_to_change = ["format"] + for prop in props_to_change: + schema["properties"][prop]["type"] = "object" + if "oneOf" in schema["properties"][prop]: + continue + schema["properties"][prop]["oneOf"] = schema["properties"][prop].pop("anyOf") return schema @staticmethod diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 836c93d055b3..e988f3f2d80d 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -142,16 +142,13 @@ def storagefile_class(self) -> type: :return: reference to relevant class """ - @staticmethod @abstractmethod - def filepath_iterator(logger: AirbyteLogger, provider: dict) -> Iterator[str]: + def filepath_iterator() -> Iterator[str]: """ Provider-specific method to iterate through bucket/container/etc. and yield each full filepath. This should supply the 'url' to use in StorageFile(). This is possibly better described as blob or file path. e.g. for AWS: f"s3://{aws_access_key_id}:{aws_secret_access_key}@{self.url}" <- self.url is what we want to yield here - :param logger: instance of AirbyteLogger to use as this is a staticmethod - :param provider: provider specific mapping as described in spec.json :yield: url filepath to use in StorageFile() """ @@ -185,7 +182,7 @@ def get_storagefile_with_lastmod(filepath: str) -> Tuple[datetime, StorageFile]: # TODO: don't hardcode max_workers like this with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: - filepath_gen = self.pattern_matched_filepath_iterator(self.filepath_iterator(self.logger, self._provider)) + filepath_gen = self.pattern_matched_filepath_iterator(self.filepath_iterator()) futures = [executor.submit(get_storagefile_with_lastmod, fp) for fp in filepath_gen] diff --git a/airbyte-integrations/connectors/source-s3/source_s3/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/stream.py index f59134aa40ec..c79a71c2e04a 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/stream.py @@ -23,14 +23,14 @@ # -from typing import Any, Iterator, Mapping +from typing import Iterator -from airbyte_cdk.logger import AirbyteLogger from boto3 import session as boto3session from botocore import UNSIGNED from botocore.config import Config from .s3file import S3File +from .source_files_abstract import make_s3_client from .source_files_abstract.stream import IncrementalFileStream @@ -39,23 +39,24 @@ class IncrementalFileStreamS3(IncrementalFileStream): def storagefile_class(self) -> type: return S3File - @staticmethod - def _list_bucket(provider: Mapping[str, Any], accept_key=lambda k: True) -> Iterator[str]: + def _list_bucket(self, accept_key=lambda k: True) -> Iterator[str]: """ Wrapper for boto3's list_objects_v2 so we can handle pagination, filter by lambda func and operate with or without credentials - :param provider: provider specific mapping as described in spec.json :param accept_key: lambda function to allow filtering return keys, e.g. lambda k: not k.endswith('/'), defaults to lambda k: True :yield: key (name) of each object """ + provider = self._provider + + client_config = None if S3File.use_aws_account(provider): session = boto3session.Session( aws_access_key_id=provider["aws_access_key_id"], aws_secret_access_key=provider["aws_secret_access_key"] ) - client = session.client("s3") else: session = boto3session.Session() - client = session.client("s3", config=Config(signature_version=UNSIGNED)) + client_config = Config(signature_version=UNSIGNED) + client = make_s3_client(self._provider, config=client_config, session=session) ctoken = None while True: @@ -79,23 +80,18 @@ def _list_bucket(provider: Mapping[str, Any], accept_key=lambda k: True) -> Iter if not ctoken: break - @staticmethod - def filepath_iterator(logger: AirbyteLogger, provider: dict) -> Iterator[str]: + def filepath_iterator(self) -> Iterator[str]: """ See _list_bucket() for logic of interacting with S3 - :param logger: instance of AirbyteLogger to use as this is a staticmethod - :param provider: S3 provider mapping as described in spec.json :yield: url filepath to use in S3File() """ - prefix = provider.get("path_prefix") + prefix = self._provider.get("path_prefix") if prefix is None: prefix = "" - msg = f"Iterating S3 bucket '{provider['bucket']}'" - logger.info(msg + f" with prefix: '{prefix}' " if prefix != "" else msg) + msg = f"Iterating S3 bucket '{self._provider['bucket']}'" + self.logger.info(msg + f" with prefix: '{prefix}' " if prefix != "" else msg) - for blob in IncrementalFileStreamS3._list_bucket( - provider=provider, accept_key=lambda k: not k.endswith("/") # filter out 'folders', we just want actual blobs - ): + for blob in self._list_bucket(accept_key=lambda k: not k.endswith("/")): # filter out 'folders', we just want actual blobs yield blob diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 6decfc026854..6dce6da594ba 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -6,6 +6,7 @@ The S3 source enables syncing of file-based tables with support for multiple fil You can choose if this connector will read only the new/updated files, or all the matching files, every time a sync is run. +Connector allows using either Amazon S3 storage or 3rd party S3 compatible service like Wasabi or custom S3 services set up with minio, leofs, ceph etc. ### Output Schema At this time, this source produces only a single stream (table) for the target files. @@ -156,7 +157,9 @@ For example: - `aws_access_key_id` : one half of the [required credentials](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) for accessing a private bucket. - `aws_secret_access_key` : other half of the [required credentials](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) for accessing a private bucket. - `path_prefix` : an optional string that limits the files returned by AWS when listing files to only that those starting with this prefix. This is different to path_pattern as it gets pushed down to the API call made to S3 rather than filtered in Airbyte and it does not accept pattern-style symbols (like wildcards `*`). We recommend using this if your bucket has many folders and files that are unrelated to this stream and all the relevant files will always sit under this chosen prefix. - +- `enpoint` : optional parameter that allow using of non Amazon S3 compatible services. Leave it blank for using default Amazon serivce. +- `use_ssl` : Allows using custom servers that configured to use plain http. Ignored in case of using Amazon service. +- `verify_ssl_cert` : Skip ssl validity check in case of using custom servers with self signed certificates. Ignored in case of using Amazon service. ### File Format Settings The Reader in charge of loading the file format is currently based on [PyArrow](https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html) (Apache Arrow). Note that all files within one stream must adhere to the same read options for every provided format. @@ -195,6 +198,7 @@ You can find details on [here](https://arrow.apache.org/docs/python/generated/py | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.1.5 | 2021-09-24 | [6398](https://github.com/airbytehq/airbyte/pull/6398) | Support custom non Amazon S3 services | | 0.1.4 | 2021-08-13 | [5305](https://github.com/airbytehq/airbyte/pull/5305) | Support of Parquet format | | 0.1.3 | 2021-08-04 | [5197](https://github.com/airbytehq/airbyte/pull/5197) | Fixed bug where sync could hang indefinitely on schema inference | | 0.1.2 | 2021-08-02 | [5135](https://github.com/airbytehq/airbyte/pull/5135) | Fixed bug in spec so it displays in UI correctly | From 4d3425de74af3a1875ee73146ef7041ee0c62ba1 Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Fri, 24 Sep 2021 15:34:28 +0300 Subject: [PATCH 2/3] fix review comments --- .../connectors/source-s3/acceptance-test-config.yml | 2 ++ .../connectors/source-s3/integration_tests/spec.json | 2 +- airbyte-integrations/connectors/source-s3/source_s3/source.py | 2 +- docs/integrations/sources/s3.md | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml index 61817bd04ce9..05b739789073 100644 --- a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml @@ -14,6 +14,8 @@ tests: # for custom server - config_path: "integration_tests/config_minio.json" status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" discovery: # for CSV format - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index 3da093640094..c50da5ffb4a7 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -163,7 +163,7 @@ }, "use_ssl": { "title": "Use Ssl", - "description": "Is remove server using secure SSL/TLS connection", + "description": "Is remote server using secure SSL/TLS connection", "type": "boolean" }, "verify_ssl_cert": { diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source.py index c0fd5207dfc5..78649ca9b8e6 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source.py @@ -57,7 +57,7 @@ class Config: ) endpoint: str = Field("", description="Endpoint to S3 compatable service, leave empty for default Amazon servers") - use_ssl: bool = Field(default=None, description="Is remove server using secure SSL/TLS connection") + use_ssl: bool = Field(default=None, description="Is remote server using secure SSL/TLS connection") verify_ssl_cert: bool = Field(default=None, description="Allow self signed certificates") provider: S3Provider diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 6dce6da594ba..fae4e8e09744 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -157,7 +157,7 @@ For example: - `aws_access_key_id` : one half of the [required credentials](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) for accessing a private bucket. - `aws_secret_access_key` : other half of the [required credentials](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) for accessing a private bucket. - `path_prefix` : an optional string that limits the files returned by AWS when listing files to only that those starting with this prefix. This is different to path_pattern as it gets pushed down to the API call made to S3 rather than filtered in Airbyte and it does not accept pattern-style symbols (like wildcards `*`). We recommend using this if your bucket has many folders and files that are unrelated to this stream and all the relevant files will always sit under this chosen prefix. -- `enpoint` : optional parameter that allow using of non Amazon S3 compatible services. Leave it blank for using default Amazon serivce. +- `endpoint` : optional parameter that allow using of non Amazon S3 compatible services. Leave it blank for using default Amazon serivce. - `use_ssl` : Allows using custom servers that configured to use plain http. Ignored in case of using Amazon service. - `verify_ssl_cert` : Skip ssl validity check in case of using custom servers with self signed certificates. Ignored in case of using Amazon service. ### File Format Settings From 8becd1c1c3ae28d214acd805254d1180650a73be Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Mon, 27 Sep 2021 10:56:06 +0300 Subject: [PATCH 3/3] fix review comments --- .../source-s3/acceptance-test-config.yml | 2 +- .../source-s3/integration_tests/acceptance.py | 1 - .../source-s3/integration_tests/spec.json | 2 +- .../source-s3/source_s3/s3_utils.py | 77 +++++++++++++++++++ .../connectors/source-s3/source_s3/s3file.py | 2 +- .../connectors/source-s3/source_s3/source.py | 2 +- .../source_files_abstract/__init__.py | 77 ------------------- .../connectors/source-s3/source_s3/stream.py | 2 +- 8 files changed, 82 insertions(+), 83 deletions(-) create mode 100644 airbyte-integrations/connectors/source-s3/source_s3/s3_utils.py diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml index 05b739789073..90146466b3eb 100644 --- a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml @@ -6,7 +6,7 @@ tests: - spec_path: "integration_tests/spec.json" connection: # for CSV format - - config_path: "integration_tests/config_minio.json" + - config_path: "secrets/config.json" status: "succeed" # for Parquet format - config_path: "secrets/parquet_config.json" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py index a9c2d0e6b0b3..00310d554984 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py +++ b/airbyte-integrations/connectors/source-s3/integration_tests/acceptance.py @@ -41,7 +41,6 @@ def connector_setup(): @pytest.fixture(scope="session", autouse=True) def minio_setup(): - """ This fixture is a placeholder for external resources that acceptance test might require.""" client = docker.from_env() tmp_dir = tempfile.mkdtemp() with ZipFile("./integration_tests/minio_data.zip") as archive: diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index c50da5ffb4a7..b826eb56aa83 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -157,7 +157,7 @@ }, "endpoint": { "title": "Endpoint", - "description": "Endpoint to S3 compatable service, leave empty for default Amazon servers", + "description": "Endpoint to an S3 compatible service. Leave empty to use AWS.", "default": "", "type": "string" }, diff --git a/airbyte-integrations/connectors/source-s3/source_s3/s3_utils.py b/airbyte-integrations/connectors/source-s3/source_s3/s3_utils.py new file mode 100644 index 000000000000..d94c7feb7b41 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/source_s3/s3_utils.py @@ -0,0 +1,77 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +import boto3.session +from botocore.client import Config + +from .source import SourceFilesAbstract + + +def make_s3_resource(provider: dict, session: boto3.session.Session, config: Config = None) -> object: + """ + Construct boto3 resource with specified config and remote endpoint + :param provider provider configuration from connector configuration. + :param session User session to create client from. + :param config Client config parameter in case of using creds from .aws/config file. + :return Boto3 S3 resource instance. + """ + client_kv_args = _get_s3_client_args(provider, config) + return session.resource("s3", **client_kv_args) + + +def make_s3_client(provider: dict, session: boto3.session.Session = None, config: Config = None) -> object: + """ + Construct boto3 client with specified config and remote endpoint + :param provider provider configuration from connector configuration. + :param session User session to create client from. Default boto3 sesion in case of session not specified. + :param config Client config parameter in case of using creds from .aws/config file. + :return Boto3 S3 client instance. + """ + client_kv_args = _get_s3_client_args(provider, config) + if session is None: + return boto3.client("s3", **client_kv_args) + else: + return session.client("s3", **client_kv_args) + + +def _get_s3_client_args(provider: dict, config: Config) -> dict: + """ + Returns map of args used for creating s3 boto3 client. + :param provider provider configuration from connector configuration. + :param config Client config parameter in case of using creds from .aws/config file. + :return map of s3 client arguments. + """ + client_kv_args = {"config": config} + endpoint = provider.get("endpoint") + if endpoint: + # endpoint could be None or empty string, set to default Amazon endpoint in + # this case. + client_kv_args["endpoint_url"] = endpoint + client_kv_args["use_ssl"] = provider.get("use_ssl") + client_kv_args["verify"] = provider.get("verify_ssl_cert") + + return client_kv_args + + +__all__ = ["SourceFilesAbstract", "make_s3_client", "make_s3_resource"] diff --git a/airbyte-integrations/connectors/source-s3/source_s3/s3file.py b/airbyte-integrations/connectors/source-s3/source_s3/s3file.py index b60975abcf86..98c0483c37b9 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/s3file.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/s3file.py @@ -33,8 +33,8 @@ from botocore.client import Config as ClientConfig from botocore.config import Config from botocore.exceptions import NoCredentialsError +from source_s3.s3_utils import make_s3_client, make_s3_resource -from .source_files_abstract import make_s3_client, make_s3_resource from .source_files_abstract.storagefile import StorageFile diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source.py index 78649ca9b8e6..323ea84fd307 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source.py @@ -56,7 +56,7 @@ class Config: description="By providing a path-like prefix (e.g. myFolder/thisTable/) under which all the relevant files sit, we can optimise finding these in S3. This is optional but recommended if your bucket contains many folders/files.", ) - endpoint: str = Field("", description="Endpoint to S3 compatable service, leave empty for default Amazon servers") + endpoint: str = Field("", description="Endpoint to an S3 compatible service. Leave empty to use AWS.") use_ssl: bool = Field(default=None, description="Is remote server using secure SSL/TLS connection") verify_ssl_cert: bool = Field(default=None, description="Allow self signed certificates") diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py index 4e88ebd87316..9db886e0930f 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/__init__.py @@ -21,80 +21,3 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # -""" -MIT License - -Copyright (c) 2020 Airbyte - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -""" - -import boto3.session -from botocore.client import Config - -from .source import SourceFilesAbstract - - -def make_s3_resource(provider: dict, session: boto3.session.Session, config: Config = None) -> object: - """ - Construct boto3 resource with specified config and remote endpoint - :param provider provider configuration from connector configuration. - :param session User session to create client from. - :param config Client config parameter in case of using creds from .aws/config file. - :return Boto3 S3 resource instance. - """ - client_kv_args = _get_s3_client_args(provider, config) - return session.resource("s3", **client_kv_args) - - -def make_s3_client(provider: dict, session: boto3.session.Session = None, config: Config = None) -> object: - """ - Construct boto3 client with specified config and remote endpoint - :param provider provider configuration from connector configuration. - :param session User session to create client from. Default boto3 sesion in case of session not specified. - :param config Client config parameter in case of using creds from .aws/config file. - :return Boto3 S3 client instance. - """ - client_kv_args = _get_s3_client_args(provider, config) - if session is None: - return boto3.client("s3", **client_kv_args) - else: - return session.client("s3", **client_kv_args) - - -def _get_s3_client_args(provider: dict, config: Config) -> dict: - """ - Returns map of args used for creating s3 boto3 client. - :param provider provider configuration from connector configuration. - :param config Client config parameter in case of using creds from .aws/config file. - :return map of s3 client arguments. - """ - client_kv_args = {"config": config} - endpoint = provider.get("endpoint") - if endpoint: - # endpoint could be None or empty string, set to default Amazon endpoint in - # this case. - client_kv_args["endpoint_url"] = endpoint - client_kv_args["use_ssl"] = provider.get("use_ssl") - client_kv_args["verify"] = provider.get("verify_ssl_cert") - - return client_kv_args - - -__all__ = ["SourceFilesAbstract", "make_s3_client", "make_s3_resource"] diff --git a/airbyte-integrations/connectors/source-s3/source_s3/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/stream.py index c79a71c2e04a..9ed55cf1a067 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/stream.py @@ -28,9 +28,9 @@ from boto3 import session as boto3session from botocore import UNSIGNED from botocore.config import Config +from source_s3.s3_utils import make_s3_client from .s3file import S3File -from .source_files_abstract import make_s3_client from .source_files_abstract.stream import IncrementalFileStream