diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index b689032f7a10..026db2b453c6 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.63 +- Define `namespace` property on the `Stream` class inside `core.py`. + ## 0.1.62 Bugfix: Correctly obfuscate nested secrets and secrets specified inside oneOf blocks inside the connector's spec. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index 97ad28c29a54..02199df40c31 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -112,6 +112,9 @@ def get_json_schema(self) -> Mapping[str, Any]: def as_airbyte_stream(self) -> AirbyteStream: stream = AirbyteStream(name=self.name, json_schema=dict(self.get_json_schema()), supported_sync_modes=[SyncMode.full_refresh]) + if self.namespace: + stream.namespace = self.namespace + if self.supports_incremental: stream.source_defined_cursor = self.source_defined_cursor stream.supported_sync_modes.append(SyncMode.incremental) # type: ignore @@ -141,6 +144,14 @@ def cursor_field(self) -> Union[str, List[str]]: """ return [] + @property + def namespace(self) -> Optional[str]: + """ + Override to return the namespace of this stream, e.g. the Postgres schema which this stream will emit records for. + :return: A string containing the name of the namespace. + """ + return None + @property def source_defined_cursor(self) -> bool: """ diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 53ef5967d6d9..64f8a66f7480 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.62", + version="0.1.63", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/streams/test_streams_core.py b/airbyte-cdk/python/unit_tests/sources/streams/test_streams_core.py index df01edc17ea0..82fe96d412c2 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/test_streams_core.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/test_streams_core.py @@ -57,6 +57,26 @@ def read_records( cursor_field = "test_cursor" primary_key = "primary_key" + namespace = "test_namespace" + + +class StreamStubIncrementalEmptyNamespace(Stream): + """ + Stub full incremental class, with empty namespace, to assist with testing. + """ + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + pass + + cursor_field = "test_cursor" + primary_key = "primary_key" + namespace = "" def test_as_airbyte_stream_incremental(mocker): @@ -71,6 +91,7 @@ def test_as_airbyte_stream_incremental(mocker): exp = AirbyteStream( name="stream_stub_incremental", + namespace="test_namespace", json_schema={}, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], default_cursor_field=["test_cursor"], @@ -99,6 +120,47 @@ def test_supports_incremental_cursor_not_set(): assert not test_stream.supports_incremental +def test_namespace_set(): + """ + Should allow namespace property to be set. + """ + test_stream = StreamStubIncremental() + + assert test_stream.namespace == "test_namespace" + + +def test_namespace_set_to_empty_string(mocker): + """ + Should not set namespace property if equal to empty string. + """ + test_stream = StreamStubIncremental() + + mocker.patch.object(StreamStubIncremental, "get_json_schema", return_value={}) + mocker.patch.object(StreamStubIncremental, "namespace", "") + + airbyte_stream = test_stream.as_airbyte_stream() + + exp = AirbyteStream( + name="stream_stub_incremental", + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + default_cursor_field=["test_cursor"], + source_defined_cursor=True, + source_defined_primary_key=[["primary_key"]], + namespace=None, + ) + assert exp == airbyte_stream + + +def test_namespace_not_set(): + """ + Should be equal to unset value of None. + """ + test_stream = StreamStubFullRefresh() + + assert test_stream.namespace is None + + @pytest.mark.parametrize( "test_input, expected", [("key", [["key"]]), (["key1", "key2"], [["key1"], ["key2"]]), ([["key1", "key2"], ["key3"]], [["key1", "key2"], ["key3"]])],