From 0365c294a94ac607e0235b93efa31ed151625459 Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Wed, 11 May 2022 16:05:14 -0400 Subject: [PATCH] SAT: add `threshold_days` incremental test option (#12715) * SAT: add `threshold_days` incremental test option * fix: support cursor values that are already dates * dont use constant value * update docs * use pendulum for date parsing * bump cdk version * use pendulum for duration * add support for unix timestamps * bump version, update changelog --- .../bases/source-acceptance-test/CHANGELOG.md | 4 + .../bases/source-acceptance-test/Dockerfile | 2 +- .../bases/source-acceptance-test/setup.py | 2 +- .../source_acceptance_test/config.py | 5 + .../tests/test_incremental.py | 44 ++++++- .../unit_tests/test_incremental.py | 112 ++++++++++++++++++ .../source-acceptance-tests-reference.md | 13 +- 7 files changed, 169 insertions(+), 13 deletions(-) create mode 100644 airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py diff --git a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md index 9301236d90ed..b6fb971fe5bc 100644 --- a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.1.51 +- Add `threshold_days` option for lookback window support in incremental tests. +- Update CDK to prevent warnings when encountering new `AirbyteTraceMessage`s. + ## 0.1.50 Added support for passing a `.yaml` file as `spec_path`. diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index 8ff75150da7f..3351bd6eab6d 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY source_acceptance_test ./source_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.1.50 +LABEL io.airbyte.version=0.1.51 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"] diff --git a/airbyte-integrations/bases/source-acceptance-test/setup.py b/airbyte-integrations/bases/source-acceptance-test/setup.py index 1b487e137730..28b53bf2dd05 100644 --- a/airbyte-integrations/bases/source-acceptance-test/setup.py +++ b/airbyte-integrations/bases/source-acceptance-test/setup.py @@ -6,7 +6,7 @@ import setuptools MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1.25", + "airbyte-cdk~=0.1.56", "docker~=5.0.3", "PyYAML~=5.4", "icdiff~=1.9", diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py index 6051aa81160b..44c9e3d13365 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/config.py @@ -105,6 +105,11 @@ class IncrementalConfig(BaseConfig): ) future_state_path: Optional[str] = Field(description="Path to a state file with values in far future") timeout_seconds: int = timeout_seconds + threshold_days: int = Field( + description="Allow records to be emitted with a cursor value this number of days before the state cursor", + default=0, + ge=0, + ) class TestConfig(BaseConfig): diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py index dc9db26749c9..34b295991a6b 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py @@ -2,15 +2,17 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # - import json +from datetime import datetime from pathlib import Path from typing import Any, Iterable, Mapping, Tuple +import pendulum import pytest from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type from source_acceptance_test import BaseTest -from source_acceptance_test.utils import ConnectorRunner, JsonSchemaHelper, filter_output, incremental_only_catalog +from source_acceptance_test.config import IncrementalConfig +from source_acceptance_test.utils import ConnectorRunner, JsonSchemaHelper, SecretDict, filter_output, incremental_only_catalog @pytest.fixture(name="future_state_path") @@ -76,9 +78,41 @@ def records_with_state(records, state, stream_mapping, state_cursor_paths) -> It yield record_value, state_value, stream_name +def compare_cursor_with_threshold(record_value, state_value, threshold_days: int) -> bool: + """ + Checks if the record's cursor value is older or equal to the state cursor value. + + If the threshold_days option is set, the values will be converted to dates so that the time-based offset can be applied. + :raises: pendulum.parsing.exceptions.ParserError: if threshold_days is passed with non-date cursor values. + """ + if threshold_days: + + def _parse_date_value(value) -> datetime: + if isinstance(value, datetime): + return value + if isinstance(value, (int, float)): + return pendulum.from_timestamp(value / 1000) + return pendulum.parse(value) + + record_date_value = _parse_date_value(record_value) + state_date_value = _parse_date_value(state_value) + + return record_date_value >= (state_date_value - pendulum.duration(days=threshold_days)) + + return record_value >= state_value + + @pytest.mark.default_timeout(20 * 60) class TestIncremental(BaseTest): - def test_two_sequential_reads(self, connector_config, configured_catalog_for_incremental, cursor_paths, docker_runner: ConnectorRunner): + def test_two_sequential_reads( + self, + inputs: IncrementalConfig, + connector_config: SecretDict, + configured_catalog_for_incremental: ConfiguredAirbyteCatalog, + cursor_paths: dict[str, list[str]], + docker_runner: ConnectorRunner, + ): + threshold_days = getattr(inputs, "threshold_days") or 0 stream_mapping = {stream.stream.name: stream for stream in configured_catalog_for_incremental.streams} output = docker_runner.call_read(connector_config, configured_catalog_for_incremental) @@ -98,8 +132,8 @@ def test_two_sequential_reads(self, connector_config, configured_catalog_for_inc records_2 = filter_output(output, type_=Type.RECORD) for record_value, state_value, stream_name in records_with_state(records_2, latest_state, stream_mapping, cursor_paths): - assert ( - record_value >= state_value + assert compare_cursor_with_threshold( + record_value, state_value, threshold_days ), f"Second incremental sync should produce records older or equal to cursor value from the state. Stream: {stream_name}" def test_state_with_abnormally_large_values(self, connector_config, configured_catalog, future_state, docker_runner: ConnectorRunner): diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py new file mode 100644 index 000000000000..150addea0f3a --- /dev/null +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_incremental.py @@ -0,0 +1,112 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +from datetime import datetime +from unittest.mock import MagicMock + +import pendulum +import pytest +from airbyte_cdk.models import ( + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStateMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + Type, +) +from source_acceptance_test.config import IncrementalConfig +from source_acceptance_test.tests.test_incremental import TestIncremental as _TestIncremental +from source_acceptance_test.tests.test_incremental import compare_cursor_with_threshold + + +def build_messages_from_record_data(records: list[dict]) -> list[AirbyteMessage]: + return [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data=data, emitted_at=111)) for data in records + ] + + +def build_state_message(state: dict) -> AirbyteMessage: + return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state)) + + +@pytest.mark.parametrize( + "record_value, state_value, threshold_days, expected_result", + [ + (datetime(2020, 10, 10), datetime(2020, 10, 9), 0, True), + (datetime(2020, 10, 10), datetime(2020, 10, 11), 0, False), + (datetime(2020, 10, 10), datetime(2020, 10, 11), 1, True), + (pendulum.parse("2020-10-10"), pendulum.parse("2020-10-09"), 0, True), + (pendulum.parse("2020-10-10"), pendulum.parse("2020-10-11"), 0, False), + (pendulum.parse("2020-10-10"), pendulum.parse("2020-10-11"), 1, True), + ("2020-10-10", "2020-10-09", 0, True), + ("2020-10-10", "2020-10-11", 0, False), + ("2020-10-10", "2020-10-11", 1, True), + (1602288000000, 1602201600000, 0, True), + (1602288000000, 1602374400000, 0, False), + (1602288000000, 1602374400000, 1, True), + (1602288000, 1602201600, 0, True), + (1602288000, 1602374400, 0, False), + (1602288000, 1602374400, 1, True), + ("aaa", "bbb", 0, False), + ("bbb", "aaa", 0, True), + ], +) +def test_compare_cursor_with_threshold(record_value, state_value, threshold_days, expected_result): + assert compare_cursor_with_threshold(record_value, state_value, threshold_days) == expected_result + + +@pytest.mark.parametrize("cursor_type", ["date", "string"]) +@pytest.mark.parametrize( + "records1, records2, latest_state, threshold_days, expected_error", + [ + ([{"date": "2020-01-01"}, {"date": "2020-01-02"}], [], "2020-01-02", 0, None), + ([{"date": "2020-01-02"}, {"date": "2020-01-03"}], [], "2020-01-02", 0, "First incremental sync should produce records younger"), + ([{"date": "2020-01-01"}, {"date": "2020-01-02"}], [{"date": "2020-01-02"}, {"date": "2020-01-03"}], "2020-01-02", 0, None), + ([{"date": "2020-01-01"}], [{"date": "2020-01-01"}], "2020-01-02", 0, "Second incremental sync should produce records older"), + ([{"date": "2020-01-01"}, {"date": "2020-01-02"}], [{"date": "2020-01-01"}, {"date": "2020-01-02"}], "2020-01-03", 2, None), + ([{"date": "2020-01-02"}, {"date": "2020-01-03"}], [], "2020-01-02", 2, "First incremental sync should produce records younger"), + ([{"date": "2020-01-01"}], [{"date": "2020-01-02"}], "2020-01-06", 3, "Second incremental sync should produce records older"), + ], +) +def test_incremental_two_sequential_reads(records1, records2, latest_state, threshold_days, cursor_type, expected_error): + input_config = IncrementalConfig(threshold_days=threshold_days) + cursor_paths = {"test_stream": ["date"]} + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="test_stream", + json_schema={"type": "object", "properties": {"date": {"type": cursor_type}}}, + supported_sync_modes=["full_refresh", "incremental"], + ), + sync_mode="incremental", + destination_sync_mode="overwrite", + cursor_field=["date"], + ) + ] + ) + + docker_runner_mock = MagicMock() + docker_runner_mock.call_read.return_value = [*build_messages_from_record_data(records1), build_state_message({"date": latest_state})] + docker_runner_mock.call_read_with_state.return_value = build_messages_from_record_data(records2) + + t = _TestIncremental() + if expected_error: + with pytest.raises(AssertionError, match=expected_error): + t.test_two_sequential_reads( + inputs=input_config, + connector_config=MagicMock(), + configured_catalog_for_incremental=catalog, + cursor_paths=cursor_paths, + docker_runner=docker_runner_mock, + ) + else: + t.test_two_sequential_reads( + inputs=input_config, + connector_config=MagicMock(), + configured_catalog_for_incremental=catalog, + cursor_paths=cursor_paths, + docker_runner=docker_runner_mock, + ) diff --git a/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md b/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md index ca63ca12844d..1454868754e8 100644 --- a/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md +++ b/docs/connector-development/testing-connectors/source-acceptance-tests-reference.md @@ -185,12 +185,13 @@ This test performs two read operations on all streams which support full refresh This test verifies that all streams in the input catalog which support incremental sync can do so correctly. It does this by running two read operations: the first takes the configured catalog and config provided to this test as input. It then verifies that the sync produced a non-zero number of `RECORD` and `STATE` messages. The second read takes the same catalog and config used in the first test, plus the last `STATE` message output by the first read operation as the input state file. It verifies that either no records are produced \(since we read all records in the first sync\) or all records that produced have cursor value greater or equal to cursor value from `STATE` message. This test is performed only for streams that support incremental. Streams that do not support incremental sync are ignored. If no streams in the input catalog support incremental sync, this test is skipped. -| Input | Type | Default | Note | -| :--- | :--- | :--- | :--- | -| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration | -| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog | -| `cursor_paths` | dict | {} | For each stream, the path of its cursor field in the output state messages. If omitted the path will be taken from the last piece of path from stream cursor\_field. | -| `timeout_seconds` | int | 20\*60 | Test execution timeout in seconds | +| Input | Type | Default | Note | +|:--------------------------|:-------|:--------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration | +| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog | +| `cursor_paths` | dict | {} | For each stream, the path of its cursor field in the output state messages. If omitted the path will be taken from the last piece of path from stream cursor\_field. | +| `timeout_seconds` | int | 20\*60 | Test execution timeout in seconds | +| `threshold_days` | int | 0 | For date-based cursors, allow records to be emitted with a cursor value this number of days before the state value. | ### TestStateWithAbnormallyLargeValues