diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py new file mode 100644 index 000000000000..a627dd353df5 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py @@ -0,0 +1,35 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import itertools +from collections import ChainMap +from typing import Any, Iterable, List, Mapping + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer + + +class CartesianProductStreamSlicer(StreamSlicer): + """ + Stream slicers that iterates over the cartesian product of input stream slicers + Given 2 stream slicers with the following slices: + A: [{"i": 0}, {"i": 1}, {"i": 2}] + B: [{"s": "hello"}, {"s": "world"}] + the resulting stream slices are + [ + {"i": 0, "s": "hello"}, + {"i": 0, "s": "world"}, + {"i": 1, "s": "hello"}, + {"i": 1, "s": "world"}, + {"i": 2, "s": "hello"}, + {"i": 2, "s": "world"}, + ] + """ + + def __init__(self, stream_slicers: List[StreamSlicer]): + self._stream_slicers = stream_slicers + + def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: + sub_slices = (s.stream_slices(sync_mode, stream_state) for s in self._stream_slicers) + return (ChainMap(*a) for a in itertools.product(*sub_slices)) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py index ff6a8ed33bd8..c68e9fa77ef0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py @@ -27,4 +27,4 @@ def __init__(self, slice_values: Union[str, List[str]], slice_definition: Mappin self._config = config def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: - return (self._interpolation.eval(self._config, slice_value=slice_value, literal_eval=True) for slice_value in self._slice_values) + return [self._interpolation.eval(self._config, slice_value=slice_value) for slice_value in self._slice_values] diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py new file mode 100644 index 000000000000..29cc1d58eadb --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py @@ -0,0 +1,61 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest as pytest +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer + + +@pytest.mark.parametrize( + "test_name, stream_slicers, expected_slices", + [ + ( + "test_single_stream_slicer", + [ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None)], + [{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}], + ), + ( + "test_two_stream_slicers", + [ + ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None), + ListStreamSlicer(["A", "B"], {"letter": "{{ slice_value }}"}, None), + ], + [ + {"owner_resource": "customer", "letter": "A"}, + {"owner_resource": "customer", "letter": "B"}, + {"owner_resource": "store", "letter": "A"}, + {"owner_resource": "store", "letter": "B"}, + {"owner_resource": "subscription", "letter": "A"}, + {"owner_resource": "subscription", "letter": "B"}, + ], + ), + ( + "test_list_and_datetime", + [ + ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None), + DatetimeStreamSlicer( + InterpolatedString("2021-01-01"), InterpolatedString("2021-01-03"), "1d", InterpolatedString(""), "%Y-%m-%d", None + ), + ], + [ + {"owner_resource": "customer", "start_date": "2021-01-01", "end_date": "2021-01-01"}, + {"owner_resource": "customer", "start_date": "2021-01-02", "end_date": "2021-01-02"}, + {"owner_resource": "customer", "start_date": "2021-01-03", "end_date": "2021-01-03"}, + {"owner_resource": "store", "start_date": "2021-01-01", "end_date": "2021-01-01"}, + {"owner_resource": "store", "start_date": "2021-01-02", "end_date": "2021-01-02"}, + {"owner_resource": "store", "start_date": "2021-01-03", "end_date": "2021-01-03"}, + {"owner_resource": "subscription", "start_date": "2021-01-01", "end_date": "2021-01-01"}, + {"owner_resource": "subscription", "start_date": "2021-01-02", "end_date": "2021-01-02"}, + {"owner_resource": "subscription", "start_date": "2021-01-03", "end_date": "2021-01-03"}, + ], + ), + ], +) +def test_substream_slicer(test_name, stream_slicers, expected_slices): + slicer = CartesianProductStreamSlicer(stream_slicers) + slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)] + assert slices == expected_slices