Skip to content

Commit

Permalink
🏗️ SAT: check record object fields overlaps with schema
Browse files Browse the repository at this point in the history
  • Loading branch information
avida authored Sep 29, 2021
1 parent bbf098a commit 8b021a8
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.19
Assert a non-empty overlap between the fields present in the record and the declared json schema.

## 0.1.18
Fix checking date-time format againt nullable field.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ COPY setup.py ./
COPY pytest.ini ./
RUN pip install .

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"]
1 change: 1 addition & 0 deletions airbyte-integrations/bases/source-acceptance-test/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"pprintpp~=0.4",
"dpath~=2.0.1",
"jsonschema~=3.2.0",
"jsonref==0.2",
]

setuptools.setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from collections import Counter, defaultdict
from functools import reduce
from logging import Logger
from typing import Any, Dict, List, Mapping, MutableMapping
from typing import Any, Dict, List, Mapping, MutableMapping, Set

import dpath.util
import pytest
from airbyte_cdk.models import AirbyteMessage, ConnectorSpecification, Status, Type
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Status, Type
from docker.errors import ContainerError
from jsonschema import validate
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig
from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, serialize, verify_records_schema
from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper
from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper, get_expected_schema_structure, get_object_structure


@pytest.mark.default_timeout(10)
Expand Down Expand Up @@ -126,10 +126,37 @@ def primary_keys_for_records(streams, records):
@pytest.mark.default_timeout(5 * 60)
class TestBasicRead(BaseTest):
@staticmethod
def _validate_schema(records, configured_catalog):
def _validate_records_structure(records: List[AirbyteRecordMessage], configured_catalog: ConfiguredAirbyteCatalog):
"""
Check object structure simmilar to one expected by schema. Sometimes
just running schema validation is not enough case schema could have
additionalProperties parameter set to true and no required fields
therefore any arbitrary object would pass schema validation.
This method is here to catch those cases by extracting all the pathes
from the object and compare it to pathes expected from jsonschema. If
there no common pathes then raise an alert.
:param records: List of airbyte record messages gathered from connector instances.
:param configured_catalog: SAT testcase parameters parsed from yaml file
"""
schemas: Dict[str, Set] = {}
for stream in configured_catalog.streams:
schemas[stream.stream.name] = set(get_expected_schema_structure(stream.stream.json_schema))

for record in records:
schema_pathes = schemas.get(record.stream)
if not schema_pathes:
continue
record_fields = set(get_object_structure(record.data))
common_fields = set.intersection(record_fields, schema_pathes)
assert common_fields, f" Record from {record.stream} stream should have some fields mentioned by json schema, {schema_pathes}"

@staticmethod
def _validate_schema(records: List[AirbyteRecordMessage], configured_catalog: ConfiguredAirbyteCatalog):
"""
Check if data type and structure in records matches the one in json_schema of the stream in catalog
"""
TestBasicRead._validate_records_structure(records, configured_catalog)
bar = "-" * 80
streams_errors = verify_records_schema(records, configured_catalog)
for stream_name, errors in streams_errors.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import dpath.util
import pendulum
from jsonref import JsonRef


class CatalogField:
Expand Down Expand Up @@ -129,3 +130,61 @@ def get_top_level_item(variant_path: List[str]):
assert any(
[all(["const" in var["properties"][prop] for var in variants]) for prop in common_props]
), f"Any of {common_props} properties in {'.'.join(variant_path)} has no const keyword. See specification reference at https://docs.airbyte.io/connector-development/connector-specification-reference"


def get_object_structure(obj: dict) -> List[str]:
"""
Traverse through object structure and compose a list of property keys including nested one.
This list reflects object's structure with list of all obj property key
paths. In case if object is nested inside array we assume that it has same
structure as first element.
:param obj: data object to get its structure
:returns list of object property keys paths
"""
paths = []

def _traverse_obj_and_get_path(obj, path=""):
if path:
paths.append(path)
if isinstance(obj, dict):
return {k: _traverse_obj_and_get_path(v, path + "/" + k) for k, v in obj.items()}
elif isinstance(obj, list) and len(obj) > 0:
return [_traverse_obj_and_get_path(obj[0], path + "/[]")]

_traverse_obj_and_get_path(obj)

return paths


def get_expected_schema_structure(schema: dict) -> List[str]:
"""
Travers through json schema and compose list of property keys that object expected to have.
:param schema: jsonschema to get expected paths
:returns list of object property keys paths
"""
paths = []
# Resolve all references to simplify schema processing.
schema = JsonRef.replace_refs(schema)

def _scan_schema(subschema, path=""):
if "oneOf" in subschema or "anyOf" in subschema:
return [_scan_schema({"type": "object", **s}, path) for s in subschema.get("oneOf") or subschema.get("anyOf")]
schema_type = subschema.get("type", ["null"])
if not isinstance(schema_type, list):
schema_type = [schema_type]
if "object" in schema_type:
props = subschema.get("properties")
if not props:
# Handle objects with arbitrary properties:
# {"type": "object", "additionalProperties": {"type": "string"}}
if path:
paths.append(path)
return
return {k: _scan_schema(v, path + "/" + k) for k, v in props.items()}
elif "array" in schema_type:
items = subschema.get("items", {})
return [_scan_schema(items, path + "/[]")]
paths.append(path)

_scan_schema(schema)
return paths
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from unittest.mock import MagicMock

import pytest
from airbyte_cdk.models import AirbyteStream
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, Type
from source_acceptance_test.config import BasicReadTestConfig
from source_acceptance_test.tests.test_core import TestBasicRead as _TestBasicRead
from source_acceptance_test.tests.test_core import TestDiscovery as _TestDiscovery


Expand All @@ -28,3 +32,42 @@ def test_discovery(schema, cursors, should_fail):
t.test_defined_cursors_exist_in_schema(None, discovered_catalog)
else:
t.test_defined_cursors_exist_in_schema(None, discovered_catalog)


@pytest.mark.parametrize(
"schema, record, should_fail",
[
({"type": "object"}, {"aa": 23}, False),
({"type": "object"}, {}, False),
({"type": "object", "properties": {"created": {"type": "string"}}}, {"aa": 23}, True),
({"type": "object", "properties": {"created": {"type": "string"}}}, {"created": "23"}, False),
({"type": "object", "properties": {"created": {"type": "string"}}}, {"root": {"created": "23"}}, True),
# Recharge shop stream case
(
{"type": "object", "properties": {"shop": {"type": ["null", "object"]}, "store": {"type": ["null", "object"]}}},
{"shop": {"a": "23"}, "store": {"b": "23"}},
False,
),
],
)
def test_read(schema, record, should_fail):
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream.parse_obj({"name": "test_stream", "json_schema": schema}),
sync_mode="full_refresh",
destination_sync_mode="overwrite",
)
]
)
input_config = BasicReadTestConfig()
docker_runner_mock = MagicMock()
docker_runner_mock.call_read.return_value = [
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=111))
]
t = _TestBasicRead()
if should_fail:
with pytest.raises(AssertionError, match="stream should have some fields mentioned by json schema"):
t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock())
else:
t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock())
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from pydantic import BaseModel
from source_acceptance_test.tests.test_incremental import records_with_state
from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper
from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper, get_expected_schema_structure, get_object_structure


@pytest.fixture(name="simple_state")
Expand Down Expand Up @@ -167,3 +167,53 @@ class Root(BaseModel):
assert variant_paths == [["properties", "f", "anyOf"], ["definitions", "C", "properties", "e", "anyOf"]]
# TODO: implement validation for pydantic generated objects as well
# js_helper.validate_variant_paths(variant_paths)


@pytest.mark.parametrize(
"object, pathes",
[
({}, []),
({"a": 12}, ["/a"]),
({"a": {"b": 12}}, ["/a", "/a/b"]),
({"a": {"b": 12}, "c": 45}, ["/a", "/a/b", "/c"]),
(
{"a": [{"b": 12}]},
["/a", "/a/[]", "/a/[]/b"],
),
({"a": [{"b": 12}, {"b": 15}]}, ["/a", "/a/[]", "/a/[]/b"]),
({"a": [[[{"b": 12}, {"b": 15}]]]}, ["/a", "/a/[]", "/a/[]/[]", "/a/[]/[]/[]", "/a/[]/[]/[]/b"]),
],
)
def test_get_object_strucutre(object, pathes):
assert get_object_structure(object) == pathes


@pytest.mark.parametrize(
"schema, pathes",
[
({"type": "object", "properties": {"a": {"type": "string"}}}, ["/a"]),
({"type": "object", "properties": {"a": {"type": "string"}, "b": {"type": "number"}}}, ["/a", "/b"]),
(
{
"type": "object",
"properties": {"a": {"type": "string"}, "b": {"$ref": "#definitions/b_type"}},
"definitions": {"b_type": {"type": "number"}},
},
["/a", "/b"],
),
({"type": "object", "oneOf": [{"properties": {"a": {"type": "string"}}}, {"properties": {"b": {"type": "string"}}}]}, ["/a", "/b"]),
# Some of pydantic generatec schemas have anyOf keyword
({"type": "object", "anyOf": [{"properties": {"a": {"type": "string"}}}, {"properties": {"b": {"type": "string"}}}]}, ["/a", "/b"]),
(
{"type": "array", "items": {"oneOf": [{"properties": {"a": {"type": "string"}}}, {"properties": {"b": {"type": "string"}}}]}},
["/[]/a", "/[]/b"],
),
# There could be an object with any properties with specific type
({"type": "object", "properties": {"a": {"type": "object", "additionalProperties": {"type": "string"}}}}, ["/a"]),
# Array with no item type specified
({"type": "array"}, ["/[]"]),
({"type": "array", "items": {"type": "object", "additionalProperties": {"type": "string"}}}, ["/[]"]),
],
)
def test_get_expected_schema_structure(schema, pathes):
assert get_expected_schema_structure(schema) == pathes

0 comments on commit 8b021a8

Please sign in to comment.