Skip to content

Commit

Permalink
SAT: add threshold_days incremental test option (#12715)
Browse files Browse the repository at this point in the history
* SAT: add `threshold_days` incremental test option

* fix: support cursor values that are already dates

* dont use constant value

* update docs

* use pendulum for date parsing

* bump cdk version

* use pendulum for duration

* add support for unix timestamps

* bump version, update changelog
  • Loading branch information
pedroslopez authored May 11, 2022
1 parent ffd4a01 commit 9126ebc
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.1.51
- Add `threshold_days` option for lookback window support in incremental tests.
- Update CDK to prevent warnings when encountering new `AirbyteTraceMessage`s.

## 0.1.50
Added support for passing a `.yaml` file as `spec_path`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.1.50
LABEL io.airbyte.version=0.1.51
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/source-acceptance-test/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import setuptools

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1.25",
"airbyte-cdk~=0.1.56",
"docker~=5.0.3",
"PyYAML~=5.4",
"icdiff~=1.9",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ class IncrementalConfig(BaseConfig):
)
future_state_path: Optional[str] = Field(description="Path to a state file with values in far future")
timeout_seconds: int = timeout_seconds
threshold_days: int = Field(
description="Allow records to be emitted with a cursor value this number of days before the state cursor",
default=0,
ge=0,
)


class TestConfig(BaseConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import json
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, Mapping, Tuple

import pendulum
import pytest
from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type
from source_acceptance_test import BaseTest
from source_acceptance_test.utils import ConnectorRunner, JsonSchemaHelper, filter_output, incremental_only_catalog
from source_acceptance_test.config import IncrementalConfig
from source_acceptance_test.utils import ConnectorRunner, JsonSchemaHelper, SecretDict, filter_output, incremental_only_catalog


@pytest.fixture(name="future_state_path")
Expand Down Expand Up @@ -76,9 +78,41 @@ def records_with_state(records, state, stream_mapping, state_cursor_paths) -> It
yield record_value, state_value, stream_name


def compare_cursor_with_threshold(record_value, state_value, threshold_days: int) -> bool:
"""
Checks if the record's cursor value is older or equal to the state cursor value.
If the threshold_days option is set, the values will be converted to dates so that the time-based offset can be applied.
:raises: pendulum.parsing.exceptions.ParserError: if threshold_days is passed with non-date cursor values.
"""
if threshold_days:

def _parse_date_value(value) -> datetime:
if isinstance(value, datetime):
return value
if isinstance(value, (int, float)):
return pendulum.from_timestamp(value / 1000)
return pendulum.parse(value)

record_date_value = _parse_date_value(record_value)
state_date_value = _parse_date_value(state_value)

return record_date_value >= (state_date_value - pendulum.duration(days=threshold_days))

return record_value >= state_value


@pytest.mark.default_timeout(20 * 60)
class TestIncremental(BaseTest):
def test_two_sequential_reads(self, connector_config, configured_catalog_for_incremental, cursor_paths, docker_runner: ConnectorRunner):
def test_two_sequential_reads(
self,
inputs: IncrementalConfig,
connector_config: SecretDict,
configured_catalog_for_incremental: ConfiguredAirbyteCatalog,
cursor_paths: dict[str, list[str]],
docker_runner: ConnectorRunner,
):
threshold_days = getattr(inputs, "threshold_days") or 0
stream_mapping = {stream.stream.name: stream for stream in configured_catalog_for_incremental.streams}

output = docker_runner.call_read(connector_config, configured_catalog_for_incremental)
Expand All @@ -98,8 +132,8 @@ def test_two_sequential_reads(self, connector_config, configured_catalog_for_inc
records_2 = filter_output(output, type_=Type.RECORD)

for record_value, state_value, stream_name in records_with_state(records_2, latest_state, stream_mapping, cursor_paths):
assert (
record_value >= state_value
assert compare_cursor_with_threshold(
record_value, state_value, threshold_days
), f"Second incremental sync should produce records older or equal to cursor value from the state. Stream: {stream_name}"

def test_state_with_abnormally_large_values(self, connector_config, configured_catalog, future_state, docker_runner: ConnectorRunner):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from datetime import datetime
from unittest.mock import MagicMock

import pendulum
import pytest
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Type,
)
from source_acceptance_test.config import IncrementalConfig
from source_acceptance_test.tests.test_incremental import TestIncremental as _TestIncremental
from source_acceptance_test.tests.test_incremental import compare_cursor_with_threshold


def build_messages_from_record_data(records: list[dict]) -> list[AirbyteMessage]:
return [
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data=data, emitted_at=111)) for data in records
]


def build_state_message(state: dict) -> AirbyteMessage:
return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state))


@pytest.mark.parametrize(
"record_value, state_value, threshold_days, expected_result",
[
(datetime(2020, 10, 10), datetime(2020, 10, 9), 0, True),
(datetime(2020, 10, 10), datetime(2020, 10, 11), 0, False),
(datetime(2020, 10, 10), datetime(2020, 10, 11), 1, True),
(pendulum.parse("2020-10-10"), pendulum.parse("2020-10-09"), 0, True),
(pendulum.parse("2020-10-10"), pendulum.parse("2020-10-11"), 0, False),
(pendulum.parse("2020-10-10"), pendulum.parse("2020-10-11"), 1, True),
("2020-10-10", "2020-10-09", 0, True),
("2020-10-10", "2020-10-11", 0, False),
("2020-10-10", "2020-10-11", 1, True),
(1602288000000, 1602201600000, 0, True),
(1602288000000, 1602374400000, 0, False),
(1602288000000, 1602374400000, 1, True),
(1602288000, 1602201600, 0, True),
(1602288000, 1602374400, 0, False),
(1602288000, 1602374400, 1, True),
("aaa", "bbb", 0, False),
("bbb", "aaa", 0, True),
],
)
def test_compare_cursor_with_threshold(record_value, state_value, threshold_days, expected_result):
assert compare_cursor_with_threshold(record_value, state_value, threshold_days) == expected_result


@pytest.mark.parametrize("cursor_type", ["date", "string"])
@pytest.mark.parametrize(
"records1, records2, latest_state, threshold_days, expected_error",
[
([{"date": "2020-01-01"}, {"date": "2020-01-02"}], [], "2020-01-02", 0, None),
([{"date": "2020-01-02"}, {"date": "2020-01-03"}], [], "2020-01-02", 0, "First incremental sync should produce records younger"),
([{"date": "2020-01-01"}, {"date": "2020-01-02"}], [{"date": "2020-01-02"}, {"date": "2020-01-03"}], "2020-01-02", 0, None),
([{"date": "2020-01-01"}], [{"date": "2020-01-01"}], "2020-01-02", 0, "Second incremental sync should produce records older"),
([{"date": "2020-01-01"}, {"date": "2020-01-02"}], [{"date": "2020-01-01"}, {"date": "2020-01-02"}], "2020-01-03", 2, None),
([{"date": "2020-01-02"}, {"date": "2020-01-03"}], [], "2020-01-02", 2, "First incremental sync should produce records younger"),
([{"date": "2020-01-01"}], [{"date": "2020-01-02"}], "2020-01-06", 3, "Second incremental sync should produce records older"),
],
)
def test_incremental_two_sequential_reads(records1, records2, latest_state, threshold_days, cursor_type, expected_error):
input_config = IncrementalConfig(threshold_days=threshold_days)
cursor_paths = {"test_stream": ["date"]}
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="test_stream",
json_schema={"type": "object", "properties": {"date": {"type": cursor_type}}},
supported_sync_modes=["full_refresh", "incremental"],
),
sync_mode="incremental",
destination_sync_mode="overwrite",
cursor_field=["date"],
)
]
)

docker_runner_mock = MagicMock()
docker_runner_mock.call_read.return_value = [*build_messages_from_record_data(records1), build_state_message({"date": latest_state})]
docker_runner_mock.call_read_with_state.return_value = build_messages_from_record_data(records2)

t = _TestIncremental()
if expected_error:
with pytest.raises(AssertionError, match=expected_error):
t.test_two_sequential_reads(
inputs=input_config,
connector_config=MagicMock(),
configured_catalog_for_incremental=catalog,
cursor_paths=cursor_paths,
docker_runner=docker_runner_mock,
)
else:
t.test_two_sequential_reads(
inputs=input_config,
connector_config=MagicMock(),
configured_catalog_for_incremental=catalog,
cursor_paths=cursor_paths,
docker_runner=docker_runner_mock,
)
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,13 @@ This test performs two read operations on all streams which support full refresh

This test verifies that all streams in the input catalog which support incremental sync can do so correctly. It does this by running two read operations: the first takes the configured catalog and config provided to this test as input. It then verifies that the sync produced a non-zero number of `RECORD` and `STATE` messages. The second read takes the same catalog and config used in the first test, plus the last `STATE` message output by the first read operation as the input state file. It verifies that either no records are produced \(since we read all records in the first sync\) or all records that produced have cursor value greater or equal to cursor value from `STATE` message. This test is performed only for streams that support incremental. Streams that do not support incremental sync are ignored. If no streams in the input catalog support incremental sync, this test is skipped.

| Input | Type | Default | Note |
| :--- | :--- | :--- | :--- |
| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration |
| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog |
| `cursor_paths` | dict | {} | For each stream, the path of its cursor field in the output state messages. If omitted the path will be taken from the last piece of path from stream cursor\_field. |
| `timeout_seconds` | int | 20\*60 | Test execution timeout in seconds |
| Input | Type | Default | Note |
|:--------------------------|:-------|:--------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration |
| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog |
| `cursor_paths` | dict | {} | For each stream, the path of its cursor field in the output state messages. If omitted the path will be taken from the last piece of path from stream cursor\_field. |
| `timeout_seconds` | int | 20\*60 | Test execution timeout in seconds |
| `threshold_days` | int | 0 | For date-based cursors, allow records to be emitted with a cursor value this number of days before the state value. |

### TestStateWithAbnormallyLargeValues

Expand Down

0 comments on commit 9126ebc

Please sign in to comment.