diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index 0f017a8312bac..3060876a65994 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -514,8 +514,10 @@ def get_source_schema(self, source_id: str) -> Mapping[str, Any]: def does_dest_support_normalization( self, destination_definition_id: str, workspace_id: str - ) -> Dict[str, Any]: - return cast( + ) -> bool: + # Airbyte API changed source of truth for normalization in PR + # https://github.com/airbytehq/airbyte/pull/21005 + norm_dest_def_spec: bool = cast( Dict[str, Any], check.not_none( self.make_request_cached( @@ -528,6 +530,24 @@ def does_dest_support_normalization( ), ).get("supportsNormalization", False) + norm_dest_def: bool = ( + cast( + Dict[str, Any], + check.not_none( + self.make_request_cached( + endpoint="/destination_definitions/get", + data={ + "destinationDefinitionId": destination_definition_id, + }, + ) + ), + ) + .get("normalizationConfig", {}) + .get("supported", False) + ) + + return any([norm_dest_def_spec, norm_dest_def]) + def get_job_status(self, connection_id: str, job_id: int) -> Mapping[str, object]: if self.forward_logs: return check.not_none(self.make_request(endpoint="/jobs/get", data={"id": job_id})) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py index cfd7893603e5a..a1f2a7d0d7b33 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py @@ -495,3 +495,53 @@ def test_sync_and_poll_timeout( assert responses.assert_call_count(f"{ab_resource.api_base_url}/jobs/cancel", 1) is True else: assert responses.assert_call_count(f"{ab_resource.api_base_url}/jobs/cancel", 0) is True + + +@responses.activate +@pytest.mark.parametrize( + "supports_norm,norm_config_supported", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +def test_normalization_support( + supports_norm: bool, + norm_config_supported: bool, + airbyte_instance_constructor: Callable[[Dict[str, Any]], AirbyteResource], +): + ab_resource = airbyte_instance_constructor( + { + "host": "some_host", + "port": "8000", + } + ) + # See https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/destination_definition_specifications/get + responses.post( + url=ab_resource.api_base_url + "/destination_definition_specifications/get", + json={"supportsNormalization": supports_norm}, + ) + # See https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/destination_definitions/get + responses.post( + url=ab_resource.api_base_url + "/destination_definitions/get", + json={"normalizationConfig": {"supported": norm_config_supported}}, + ) + + assert ab_resource.does_dest_support_normalization("some_destination", "some_workspace") == any( + [supports_norm, norm_config_supported] + ) + + # Check for expected behaviour when keys do not exist + responses.post( + url=ab_resource.api_base_url + "/destination_definition_specifications/get", + json={}, + ) + responses.post( + url=ab_resource.api_base_url + "/destination_definitions/get", + json={}, + ) + assert ( + ab_resource.does_dest_support_normalization("some_destination", "some_workspace") is False + )