Skip to content

Commit

Permalink
🐛 Source Facebook Marketing: SAT ignored fields in TestFullRefresh.te…
Browse files Browse the repository at this point in the history
…st_sequential_reads

* Source Facebook Marketing airbytehq#5190 - estimate cost_per_estimated_ad_recallers for AdsInsights streams if not presented in records

* Source Facebook Marketing airbytehq#5190 - add ignored fields to full refresh test

* Source Facebook Marketing airbytehq#5190 - annotations

* Source Facebook Marketing airbytehq#5190 - reformat

* SAT airbytehq#5190 - delete remove_ignored_fields

* Source Facebook Marketing airbytehq#5190 - use dpath util for excluding fields

* Facebook marketing airbytehq#5190 - follow EAFP principle

* Facebook Marketing airbytehq#5190 - add unit tests.

* Source Facebook Marketing airbytehq#5190 - fixing according to PR

* Source Facebook Marketing airbytehq#5190 - support ignored fields by stream

* Source Facebook Marketing airbytehq#5190 - update docs

* Source Facebook Marketing airbytehq#5190 - merge conflicts

* Source Facebook Marketing airbytehq#5190 - bump SAT version

* Source Facebook Marketing airbytehq#5190 - fixing unit tests

* Source Facebook Marketing airbytehq#5190 - bump
  • Loading branch information
vitaliizazmic authored and schlattk committed Jan 4, 2022
1 parent 4d35df6 commit 95abd5c
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 0.1.27
Add ignored fields for full refresh test (unit tests)

## 0.1.26
Add ignored fields for full refresh test

## 0.1.25
Fix incorrect nested strucutres compare.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ COPY setup.py ./
COPY pytest.ini ./
RUN pip install .

LABEL io.airbyte.version=0.1.25
LABEL io.airbyte.version=0.1.27
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"]
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,18 @@ class BasicReadTestConfig(BaseConfig):


class FullRefreshConfig(BaseConfig):
"""Full refresh test config
Attributes:
ignored_fields for each stream, list of fields path. Path should be in format "object_key/object_key2"
"""

config_path: str = config_path
configured_catalog_path: str = configured_catalog_path
timeout_seconds: int = timeout_seconds
ignored_fields: Optional[Mapping[str, List[str]]] = Field(
description="For each stream, list of fields path ignoring in sequential reads test"
)


class IncrementalConfig(BaseConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,48 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from collections import defaultdict
from functools import partial
from logging import Logger

import pytest
from airbyte_cdk.models import Type
from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type
from source_acceptance_test.base import BaseTest
from source_acceptance_test.utils import ConnectorRunner, full_refresh_only_catalog, make_hashable
from source_acceptance_test.config import ConnectionTestConfig
from source_acceptance_test.utils import ConnectorRunner, SecretDict, full_refresh_only_catalog, make_hashable


@pytest.mark.default_timeout(20 * 60)
class TestFullRefresh(BaseTest):
def test_sequential_reads(self, connector_config, configured_catalog, docker_runner: ConnectorRunner, detailed_logger):
def test_sequential_reads(
self,
inputs: ConnectionTestConfig,
connector_config: SecretDict,
configured_catalog: ConfiguredAirbyteCatalog,
docker_runner: ConnectorRunner,
detailed_logger: Logger,
):
ignored_fields = getattr(inputs, "ignored_fields") or {}
configured_catalog = full_refresh_only_catalog(configured_catalog)
output = docker_runner.call_read(connector_config, configured_catalog)
records_1 = [message.record.data for message in output if message.type == Type.RECORD]
records_1 = [message.record for message in output if message.type == Type.RECORD]
records_by_stream_1 = defaultdict(list)
for record in records_1:
records_by_stream_1[record.stream].append(record.data)

output = docker_runner.call_read(connector_config, configured_catalog)
records_2 = [message.record.data for message in output if message.type == Type.RECORD]
records_2 = [message.record for message in output if message.type == Type.RECORD]
records_by_stream_2 = defaultdict(list)
for record in records_2:
records_by_stream_2[record.stream].append(record.data)

output_diff = set(map(make_hashable, records_1)).symmetric_difference(set(map(make_hashable, records_2)))
if output_diff:
msg = "The two sequential reads should produce either equal set of records or one of them is a strict subset of the other"
detailed_logger.info(msg)
detailed_logger.log_json_list(output_diff)
pytest.fail(msg)
for stream in records_by_stream_1.keys():
serializer = partial(make_hashable, exclude_fields=ignored_fields.get(stream))
stream_records_1 = records_by_stream_1.get(stream)
stream_records_2 = records_by_stream_2.get(stream)
output_diff = set(map(serializer, stream_records_1)).symmetric_difference(set(map(serializer, stream_records_2)))
if output_diff:
msg = f"{stream}: the two sequential reads should produce either equal set of records or one of them is a strict subset of the other"
detailed_logger.info(msg)
detailed_logger.log_json_list(output_diff)
pytest.fail(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import functools
from typing import List, Mapping, Optional

import dpath.exceptions
import dpath.util
import icdiff
import py
from pprintpp import pformat
Expand Down Expand Up @@ -78,8 +79,20 @@ class ListWithHashMixin(HashMixin, list):
pass


def make_hashable(obj):
def make_hashable(obj, exclude_fields: List[str] = None) -> str:
"""
Simplify comparison of nested dicts/lists
:param obj value for comparison
:param exclude_fields if value is Mapping, some fields can be excluded
"""
if isinstance(obj, Mapping):
# If value is Mapping, some fields can be excluded
exclude_fields = exclude_fields or []
for field in exclude_fields:
try:
dpath.util.delete(obj, field)
except dpath.exceptions.PathNotFound:
pass
return DictWithHashMixin(obj)
if isinstance(obj, List):
return ListWithHashMixin(obj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from functools import partial

import pytest
from source_acceptance_test.utils.compare import make_hashable

Expand Down Expand Up @@ -148,3 +150,17 @@ def test_compare_two_records_nested_with_different_orders(obj1, obj2, is_same):
assert not output_diff, f"{obj1} should be equal to {obj2}"
else:
assert output_diff, f"{obj1} shouldnt be equal to {obj2}"


def test_exclude_fields():
"""Test that check ignoring fields"""
data = [
sorted_data(),
]
ignored_fields = [
"organization_id",
]
serializer = partial(make_hashable, exclude_fields=ignored_fields)
output = map(serializer, data)
for item in output:
assert "organization_id" not in item
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ tests:
future_state_path: "integration_tests/future_state.json"
full_refresh:
- config_path: "secrets/config.json"
# TODO Change below `configured_catalog_without_insights.json` to `configured_catalog.json` after October 7 2021
# because all running campaigns should be finished by that time.
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# Ad Insights API has estimated metrics in response, which is calculated based on another metrics.
# Sometimes API doesn't return estimated metrics. E.g, cost_per_estimated_ad_recallers is calculated
# as total amount spent divided by estimated ad recall lift rate. When second metric is equal to zero
# API may or may not return value. Such behavior causes sequential reads test failing.
# Because one read response contains this metric, and other doesn't.
# Therefore, it's needed to ignore fields like this in API responses.
ignored_fields:
"ads_insights_age_and_gender": ["cost_per_estimated_ad_recallers"]
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@
"destination_sync_mode": "append"
}
]
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"credentials_json": "{\n \"type\": \"service_account\"}\n",
"email": "test_email",
"lookback": 0
"credentials_json": "{\n \"type\": \"service_account\"}\n",
"email": "test_email",
"lookback": 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@
"type": "string"
}
},
"required": [
"time",
"uniqueQualifier",
"applicationName",
"customerId"
]
"required": ["time", "uniqueQualifier", "applicationName", "customerId"]
},
"etag": {
"type": "string"
Expand All @@ -41,9 +36,7 @@
"type": "string"
}
},
"required": [
"email"
]
"required": ["email"]
},
"ipAddress": {
"type": "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@
"type": "string"
}
},
"required": [
"time",
"uniqueQualifier",
"applicationName",
"customerId"
]
"required": ["time", "uniqueQualifier", "applicationName", "customerId"]
},
"etag": {
"type": "string"
Expand All @@ -44,11 +39,7 @@
"type": "string"
}
},
"required": [
"callerType",
"email",
"profileId"
]
"required": ["callerType", "email", "profileId"]
},
"events": {
"type": "array",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@
"type": "string"
}
},
"required": [
"time",
"uniqueQualifier",
"applicationName",
"customerId"
]
"required": ["time", "uniqueQualifier", "applicationName", "customerId"]
},
"etag": {
"type": "string"
Expand All @@ -41,10 +36,7 @@
"type": "string"
}
},
"required": [
"email",
"profileId"
]
"required": ["email", "profileId"]
},
"ipAddress": {
"type": "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ This test performs two read operations on all streams which support full refresh
| `config_path` | string | `secrets/config.json` | Path to a JSON object representing a valid connector configuration |
| `configured_catalog_path` | string | `integration_tests/configured_catalog.json` | Path to configured catalog |
| `timeout_seconds` | int | 20\*60 | Test execution timeout in seconds |
| `ignored_fields` | dict | None |For each stream, list of fields path ignoring in sequential reads test|

## Test Incremental sync

Expand Down

0 comments on commit 95abd5c

Please sign in to comment.