diff --git a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md index 57503e8a35d4..c8761c57b6ee 100644 --- a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.12 +Improve error message when data mismatches schema: https://github.com/airbytehq/airbyte/pull/4753 + ## 0.1.11 Fix error in the naming of method `test_match_expected` for class `TestSpec`. @@ -21,4 +24,4 @@ Add: `test_spec` additionally checks if Dockerfile has `ENV AIRBYTE_ENTRYPOINT` Add test whether PKs present and not None if `source_defined_primary_key` defined: https://github.com/airbytehq/airbyte/pull/4140 ## 0.1.5 -Add configurable timeout for the acceptance tests: https://github.com/airbytehq/airbyte/pull/4296 \ No newline at end of file +Add configurable timeout for the acceptance tests: https://github.com/airbytehq/airbyte/pull/4296 diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index 4f2df614441c..9af69a4bc23a 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -8,7 +8,7 @@ COPY setup.py ./ COPY pytest.ini ./ RUN pip install . -LABEL io.airbyte.version=0.1.11 +LABEL io.airbyte.version=0.1.12 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"] diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py index 528f22eb886b..3c2290d40d5c 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py @@ -134,16 +134,16 @@ def test_read( output = docker_runner.call_read(connector_config, configured_catalog) records = [message.record for message in output if message.type == Type.RECORD] counter = Counter(record.stream for record in records) - if inputs.validate_schema: - streams_with_errors = set() - for record, errors in verify_records_schema(records, configured_catalog): - if record.stream not in streams_with_errors: - logging.error(f"The {record.stream} stream has the following schema errors: {errors}") - streams_with_errors.add(record.stream) - - if streams_with_errors: - pytest.fail(f"Please check your json_schema in selected streams {streams_with_errors}.") + bar = "-" * 80 + streams_errors = verify_records_schema(records, configured_catalog) + for stream_name, errors in streams_errors.items(): + errors = map(str, errors.values()) + str_errors = f"\n{bar}\n".join(errors) + logging.error(f"The {stream_name} stream has the following schema errors:\n{str_errors}") + + if streams_errors: + pytest.fail(f"Please check your json_schema in selected streams {streams_errors.keys()}.") all_streams = set(stream.stream.name for stream in configured_catalog.streams) streams_with_records = set(counter.keys()) diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/asserts.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/asserts.py index 6bff896c427e..aecd97fdd9a7 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/asserts.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/asserts.py @@ -23,7 +23,8 @@ # import logging -from typing import Iterator, List, Tuple +from collections import defaultdict +from typing import List, Mapping from airbyte_cdk.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog from jsonschema import Draft4Validator, ValidationError @@ -31,7 +32,7 @@ def verify_records_schema( records: List[AirbyteRecordMessage], catalog: ConfiguredAirbyteCatalog -) -> Iterator[Tuple[AirbyteRecordMessage, List[ValidationError]]]: +) -> Mapping[str, Mapping[str, ValidationError]]: """Check records against their schemas from the catalog, yield error messages. Only first record with error will be yielded for each stream. """ @@ -39,6 +40,8 @@ def verify_records_schema( for stream in catalog.streams: validators[stream.stream.name] = Draft4Validator(stream.stream.json_schema) + stream_errors = defaultdict(dict) + for record in records: validator = validators.get(record.stream) if not validator: @@ -46,5 +49,7 @@ def verify_records_schema( continue errors = list(validator.iter_errors(record.data)) - if errors: - yield record, sorted(errors, key=str) + for error in errors: + stream_errors[record.stream][str(error.schema_path)] = error + + return stream_errors diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_asserts.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_asserts.py index 86de6ac6ea91..fb16aa3ddb49 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_asserts.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_asserts.py @@ -89,13 +89,10 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog): records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0) for record in records] - records_with_errors, record_errors = zip(*verify_records_schema(records, configured_catalog)) - errors = [[error.message for error in errors] for errors in record_errors] + streams_with_errors = verify_records_schema(records, configured_catalog) + errors = [error.message for error in streams_with_errors["my_stream"].values()] - assert len(records_with_errors) == 3, "only 3 out of 4 records have errors" - assert records_with_errors[0] == records[0], "1st record should have errors" - assert records_with_errors[1] == records[1], "2nd record should have errors" - assert records_with_errors[2] == records[3], "4th record should have errors" - assert errors[0] == ["'text' is not of type 'number'", "123 is not of type 'null', 'string'"] - assert errors[1] == ["None is not of type 'number'", "None is not of type 'string'"] - assert errors[2] == ["'text' is not of type 'number'"] + assert "my_stream" in streams_with_errors + assert len(streams_with_errors) == 1, "only one stream" + assert len(streams_with_errors["my_stream"]) == 3, "only first error for each field" + assert errors == ["123 is not of type 'null', 'string'", "'text' is not of type 'number'", "None is not of type 'string'"]