From dbbab8ffee9f6f1ce5cdea1247a25fcb48a2b048 Mon Sep 17 00:00:00 2001 From: Oliver Meyer <42039965+olivermeyer@users.noreply.github.com> Date: Mon, 12 Jul 2021 19:43:01 +0200 Subject: [PATCH 1/3] Source Dixa: Pin tz in ConversationExport.ms_timestamp_to_datetime (#4696) --- .../source-dixa/source_dixa/source.py | 6 ++++-- .../source-dixa/unit_tests/unit_test.py | 20 +++++++++++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-dixa/source_dixa/source.py b/airbyte-integrations/connectors/source-dixa/source_dixa/source.py index c2ec725b6f46..e18e8d06601e 100644 --- a/airbyte-integrations/connectors/source-dixa/source_dixa/source.py +++ b/airbyte-integrations/connectors/source-dixa/source_dixa/source.py @@ -24,7 +24,7 @@ from abc import ABC -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple import requests @@ -60,7 +60,9 @@ def ms_timestamp_to_datetime(milliseconds: int) -> datetime: """ Converts a millisecond-precision timestamp to a datetime object. """ - return datetime.fromtimestamp(ConversationExport._validate_ms_timestamp(milliseconds) / 1000) + return datetime.fromtimestamp( + ConversationExport._validate_ms_timestamp(milliseconds) / 1000, tz=timezone.utc + ) @staticmethod def datetime_to_ms_timestamp(dt: datetime) -> int: diff --git a/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py index c6806f54c012..29fb56823e32 100644 --- a/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py @@ -49,13 +49,29 @@ def test_validate_ms_timestamp_with_invalid_input_length(): def test_ms_timestamp_to_datetime(): assert ConversationExport.ms_timestamp_to_datetime(1625312980123) == datetime( - year=2021, month=7, day=3, hour=13, minute=49, second=40, microsecond=123000 + year=2021, + month=7, + day=3, + hour=11, + minute=49, + second=40, + microsecond=123000, + tzinfo=timezone.utc ) def test_datetime_to_ms_timestamp(): assert ( - ConversationExport.datetime_to_ms_timestamp(datetime(year=2021, month=7, day=3, hour=13, minute=49, second=40, microsecond=123000)) + ConversationExport.datetime_to_ms_timestamp(datetime( + year=2021, + month=7, + day=3, + hour=11, + minute=49, + second=40, + microsecond=123000, + tzinfo=timezone.utc) + ) == 1625312980123 ) From 8f2ab0914cca9732b17fe15c1884396a9174eb0b Mon Sep 17 00:00:00 2001 From: "Sherif A. Nada" Date: Mon, 12 Jul 2021 11:11:47 -0700 Subject: [PATCH 2/3] Source Dixa: add to connector index (#4701) --- .../0b5c867e-1b12-4d02-ab74-97b2184ff6d7.json | 7 +++ .../resources/seed/source_definitions.yaml | 5 ++ .../source-dixa/source_dixa/source.py | 4 +- .../source-dixa/unit_tests/unit_test.py | 63 +++++-------------- 4 files changed, 28 insertions(+), 51 deletions(-) create mode 100644 airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/0b5c867e-1b12-4d02-ab74-97b2184ff6d7.json diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/0b5c867e-1b12-4d02-ab74-97b2184ff6d7.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/0b5c867e-1b12-4d02-ab74-97b2184ff6d7.json new file mode 100644 index 000000000000..3766cbd56b33 --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/0b5c867e-1b12-4d02-ab74-97b2184ff6d7.json @@ -0,0 +1,7 @@ +{ + "sourceDefinitionId": "0b5c867e-1b12-4d02-ab74-97b2184ff6d7", + "name": "Dixa", + "dockerRepository": "airbyte/source-dixa", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://docs.airbyte.io/integrations/sources/dixa" +} diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index cb0b26003caa..8f6158028fe0 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -368,3 +368,8 @@ dockerRepository: airbyte/source-paypal-transaction dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.io/integrations/sources/paypal-transaction +- sourceDefinitionId: 0b5c867e-1b12-4d02-ab74-97b2184ff6d7 + name: Dixa + dockerRepository: airbyte/source-dixa + dockerImageTag: 0.1.0 + documentationUrl: https://docs.airbyte.io/integrations/sources/dixa diff --git a/airbyte-integrations/connectors/source-dixa/source_dixa/source.py b/airbyte-integrations/connectors/source-dixa/source_dixa/source.py index e18e8d06601e..23d807053102 100644 --- a/airbyte-integrations/connectors/source-dixa/source_dixa/source.py +++ b/airbyte-integrations/connectors/source-dixa/source_dixa/source.py @@ -60,9 +60,7 @@ def ms_timestamp_to_datetime(milliseconds: int) -> datetime: """ Converts a millisecond-precision timestamp to a datetime object. """ - return datetime.fromtimestamp( - ConversationExport._validate_ms_timestamp(milliseconds) / 1000, tz=timezone.utc - ) + return datetime.fromtimestamp(ConversationExport._validate_ms_timestamp(milliseconds) / 1000, tz=timezone.utc) @staticmethod def datetime_to_ms_timestamp(dt: datetime) -> int: diff --git a/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py index 29fb56823e32..5a2891bd1345 100644 --- a/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-dixa/unit_tests/unit_test.py @@ -20,6 +20,8 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +# + from datetime import datetime, timezone import pytest @@ -28,9 +30,7 @@ @pytest.fixture def conversation_export(): - return ConversationExport( - start_date=datetime(year=2021, month=7, day=1, hour=12, tzinfo=timezone.utc), batch_size=1, logger=None - ) + return ConversationExport(start_date=datetime(year=2021, month=7, day=1, hour=12, tzinfo=timezone.utc), batch_size=1, logger=None) def test_validate_ms_timestamp_with_valid_input(): @@ -49,28 +49,14 @@ def test_validate_ms_timestamp_with_invalid_input_length(): def test_ms_timestamp_to_datetime(): assert ConversationExport.ms_timestamp_to_datetime(1625312980123) == datetime( - year=2021, - month=7, - day=3, - hour=11, - minute=49, - second=40, - microsecond=123000, - tzinfo=timezone.utc + year=2021, month=7, day=3, hour=11, minute=49, second=40, microsecond=123000, tzinfo=timezone.utc ) def test_datetime_to_ms_timestamp(): assert ( - ConversationExport.datetime_to_ms_timestamp(datetime( - year=2021, - month=7, - day=3, - hour=11, - minute=49, - second=40, - microsecond=123000, - tzinfo=timezone.utc) + ConversationExport.datetime_to_ms_timestamp( + datetime(year=2021, month=7, day=3, hour=11, minute=49, second=40, microsecond=123000, tzinfo=timezone.utc) ) == 1625312980123 ) @@ -83,14 +69,8 @@ def test_add_days_to_ms_timestamp(): def test_stream_slices_without_state(conversation_export): conversation_export.end_timestamp = 1625270400001 # 2021-07-03 00:00:00 + 1 ms expected_slices = [ - { - 'updated_after': 1625140800000, # 2021-07-01 12:00:00 - 'updated_before': 1625227200000 # 2021-07-02 12:00:00 - }, - { - 'updated_after': 1625227200000, - 'updated_before': 1625270400001 - } + {"updated_after": 1625140800000, "updated_before": 1625227200000}, # 2021-07-01 12:00:00 # 2021-07-02 12:00:00 + {"updated_after": 1625227200000, "updated_before": 1625270400001}, ] actual_slices = conversation_export.stream_slices() assert actual_slices == expected_slices @@ -101,12 +81,7 @@ def test_stream_slices_without_state_large_batch(): start_date=datetime(year=2021, month=7, day=1, hour=12, tzinfo=timezone.utc), batch_size=31, logger=None ) conversation_export.end_timestamp = 1625270400001 # 2021-07-03 00:00:00 + 1 ms - expected_slices = [ - { - 'updated_after': 1625140800000, # 2021-07-01 12:00:00 - 'updated_before': 1625270400001 - } - ] + expected_slices = [{"updated_after": 1625140800000, "updated_before": 1625270400001}] # 2021-07-01 12:00:00 actual_slices = conversation_export.stream_slices() assert actual_slices == expected_slices @@ -123,26 +98,18 @@ def test_stream_slices_with_start_timestamp_larger_than_state(): Test that if start_timestamp is larger than state, then start at start_timestamp. """ conversation_export = ConversationExport( - start_date=datetime(year=2021, month=12, day=1, tzinfo=timezone.utc), batch_size=31, - logger=None + start_date=datetime(year=2021, month=12, day=1, tzinfo=timezone.utc), batch_size=31, logger=None ) conversation_export.end_timestamp = 1638360000001 # 2021-12-01 12:00:00 + 1 ms - expected_slices = [ - { - 'updated_after': 1638316800000, # 2021-07-01 12:00:00 - 'updated_before': 1638360000001 - } - ] - actual_slices = conversation_export.stream_slices( - stream_state={'updated_at': 1625220000000} # # 2021-07-02 12:00:00 - ) + expected_slices = [{"updated_after": 1638316800000, "updated_before": 1638360000001}] # 2021-07-01 12:00:00 + actual_slices = conversation_export.stream_slices(stream_state={"updated_at": 1625220000000}) # # 2021-07-02 12:00:00 assert actual_slices == expected_slices def test_get_updated_state_without_state(conversation_export): - assert conversation_export.get_updated_state( - current_stream_state=None, latest_record={'updated_at': 1625263200000} - ) == {'updated_at': 1625140800000} + assert conversation_export.get_updated_state(current_stream_state=None, latest_record={"updated_at": 1625263200000}) == { + "updated_at": 1625140800000 + } def test_get_updated_state_with_bigger_state(conversation_export): From 63c7a33e3e864acf76d7b704a413316084597fd3 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Mon, 12 Jul 2021 11:22:05 -0700 Subject: [PATCH 3/3] allow injecting filters for server (#4677) * allow injecting filters * fmt --- .../java/io/airbyte/server/ServerApp.java | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index ef68ad34271a..ffea3cbd392b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -60,10 +60,14 @@ import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.nio.file.Path; +import java.util.Collections; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.ContainerResponseFilter; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -89,13 +93,19 @@ public class ServerApp { private final ConfigRepository configRepository; private final JobPersistence jobPersistence; private final Configs configs; + private final Set requestFilters; + private final Set responseFilters; public ServerApp(final ConfigRepository configRepository, final JobPersistence jobPersistence, - final Configs configs) { + final Configs configs, + final Set requestFilters, + final Set responseFilters) { this.configRepository = configRepository; this.jobPersistence = jobPersistence; this.configs = configs; + this.requestFilters = requestFilters; + this.responseFilters = responseFilters; } public void start() throws Exception { @@ -123,9 +133,9 @@ public void start() throws Exception { ResourceConfig rc = new ResourceConfig() - // todo (cgardens) - the CORs settings are wide open. will need to revisit when we add auth. - // cors - .register(new CorsFilter()) + // add filters + .registerInstances(requestFilters) + .registerInstances(responseFilters) // request logging .register(new RequestLogger(mdc)) // api @@ -183,7 +193,9 @@ private static void setCustomerIdIfNotSet(final ConfigRepository configRepositor } } - public static void main(String[] args) throws Exception { + public static void runServer(final Set requestFilters, + final Set responseFilters) + throws Exception { final Configs configs = new EnvConfigs(); MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, LogClientSingleton.getServerLogsRoot(configs).toString()); @@ -233,13 +245,17 @@ public static void main(String[] args) throws Exception { if (airbyteDatabaseVersion.isPresent() && AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion.get())) { LOGGER.info("Starting server..."); - new ServerApp(configRepository, jobPersistence, configs).start(); + new ServerApp(configRepository, jobPersistence, configs, requestFilters, responseFilters).start(); } else { LOGGER.info("Start serving version mismatch errors. Automatic migration either failed or didn't run"); new VersionMismatchServer(airbyteVersion, airbyteDatabaseVersion.get(), PORT).start(); } } + public static void main(String[] args) throws Exception { + runServer(Collections.emptySet(), Set.of(new CorsFilter())); + } + /** * Ideally when automatic migration runs, we should make sure that we acquire a lock on database and * no other operation is allowed