diff --git a/.github/workflows/test_pyarrow17.yml b/.github/workflows/test_pyarrow17.yml new file mode 100644 index 0000000000..dd48c2af9d --- /dev/null +++ b/.github/workflows/test_pyarrow17.yml @@ -0,0 +1,77 @@ + +name: tests marked as needspyarrow17 + +on: + pull_request: + branches: + - master + - devel + workflow_dispatch: + schedule: + - cron: '0 2 * * *' + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +env: + + DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }} + + RUNTIME__SENTRY_DSN: https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4504819859914752 + RUNTIME__LOG_LEVEL: ERROR + RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }} + + ACTIVE_DESTINATIONS: "[\"filesystem\"]" + +jobs: + get_docs_changes: + name: docs changes + uses: ./.github/workflows/get_docs_changes.yml + if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}} + + run_pyarrow17: + name: needspyarrow17 tests + needs: get_docs_changes + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' + defaults: + run: + shell: bash + runs-on: "ubuntu-latest" + + steps: + + - name: Check out + uses: actions/checkout@master + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: "3.10.x" + + - name: Install Poetry + uses: snok/install-poetry@v1.3.2 + with: + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v3 + with: + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-pyarrow17 + + - name: Install dependencies + run: poetry install --no-interaction --with sentry-sdk --with pipeline -E deltalake -E gs -E s3 -E az + + - name: Upgrade pyarrow + run: poetry run pip install pyarrow==17.0.0 + + - name: create secrets.toml + run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml + + - run: | + poetry run pytest tests/libs tests/load -m needspyarrow17 + name: Run needspyarrow17 tests Linux diff --git a/dlt/common/exceptions.py b/dlt/common/exceptions.py index 82906b0144..6a86ab5fbe 100644 --- a/dlt/common/exceptions.py +++ b/dlt/common/exceptions.py @@ -143,6 +143,25 @@ def _to_pip_install(self) -> str: return "\n".join([f'pip install "{d}"' for d in self.dependencies]) +class DependencyVersionException(DltException): + def __init__( + self, pkg_name: str, version_found: str, version_required: str, appendix: str = "" + ) -> None: + self.pkg_name = pkg_name + self.version_found = version_found + self.version_required = version_required + super().__init__(self._get_msg(appendix)) + + def _get_msg(self, appendix: str) -> str: + msg = ( + f"Found `{self.pkg_name}=={self.version_found}`, while" + f" `{self.pkg_name}{self.version_required}` is required." + ) + if appendix: + msg = msg + "\n" + appendix + return msg + + class SystemConfigurationException(DltException): pass diff --git a/dlt/common/libs/deltalake.py b/dlt/common/libs/deltalake.py index 6156ac24cc..04100b0c6c 100644 --- a/dlt/common/libs/deltalake.py +++ b/dlt/common/libs/deltalake.py @@ -1,15 +1,16 @@ from typing import Optional, Dict, Union +from pathlib import Path from dlt import version from dlt.common import logger from dlt.common.libs.pyarrow import pyarrow as pa -from dlt.common.libs.pyarrow import dataset_to_table, cast_arrow_schema_types +from dlt.common.libs.pyarrow import cast_arrow_schema_types from dlt.common.schema.typing import TWriteDisposition from dlt.common.exceptions import MissingDependencyException from dlt.common.storages import FilesystemConfiguration try: - from deltalake import write_deltalake + from deltalake import write_deltalake, DeltaTable from deltalake.writer import try_get_deltatable except ModuleNotFoundError: raise MissingDependencyException( @@ -19,10 +20,10 @@ ) -def ensure_delta_compatible_arrow_table(table: pa.table) -> pa.Table: - """Returns Arrow table compatible with Delta table format. +def ensure_delta_compatible_arrow_schema(schema: pa.Schema) -> pa.Schema: + """Returns Arrow schema compatible with Delta table format. - Casts table schema to replace data types not supported by Delta. + Casts schema to replace data types not supported by Delta. """ ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP = { # maps type check function to type factory function @@ -30,10 +31,18 @@ def ensure_delta_compatible_arrow_table(table: pa.table) -> pa.Table: pa.types.is_time: pa.string(), pa.types.is_decimal256: pa.string(), # pyarrow does not allow downcasting to decimal128 } - adjusted_schema = cast_arrow_schema_types( - table.schema, ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP - ) - return table.cast(adjusted_schema) + return cast_arrow_schema_types(schema, ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP) + + +def ensure_delta_compatible_arrow_data( + data: Union[pa.Table, pa.RecordBatchReader] +) -> Union[pa.Table, pa.RecordBatchReader]: + """Returns Arrow data compatible with Delta table format. + + Casts `data` schema to replace data types not supported by Delta. + """ + schema = ensure_delta_compatible_arrow_schema(data.schema) + return data.cast(schema) def get_delta_write_mode(write_disposition: TWriteDisposition) -> str: @@ -50,21 +59,19 @@ def get_delta_write_mode(write_disposition: TWriteDisposition) -> str: def write_delta_table( - path: str, - data: Union[pa.Table, pa.dataset.Dataset], + table_or_uri: Union[str, Path, DeltaTable], + data: Union[pa.Table, pa.RecordBatchReader], write_disposition: TWriteDisposition, storage_options: Optional[Dict[str, str]] = None, ) -> None: """Writes in-memory Arrow table to on-disk Delta table.""" - table = dataset_to_table(data) - # throws warning for `s3` protocol: https://github.com/delta-io/delta-rs/issues/2460 # TODO: upgrade `deltalake` lib after https://github.com/delta-io/delta-rs/pull/2500 # is released write_deltalake( # type: ignore[call-overload] - table_or_uri=path, - data=ensure_delta_compatible_arrow_table(table), + table_or_uri=table_or_uri, + data=ensure_delta_compatible_arrow_data(data), mode=get_delta_write_mode(write_disposition), schema_mode="merge", # enable schema evolution (adding new columns) storage_options=storage_options, diff --git a/dlt/common/libs/pyarrow.py b/dlt/common/libs/pyarrow.py index ee249b111c..9d3e97421c 100644 --- a/dlt/common/libs/pyarrow.py +++ b/dlt/common/libs/pyarrow.py @@ -474,10 +474,6 @@ def pq_stream_with_new_columns( yield tbl -def dataset_to_table(data: Union[pyarrow.Table, pyarrow.dataset.Dataset]) -> pyarrow.Table: - return data.to_table() if isinstance(data, pyarrow.dataset.Dataset) else data - - def cast_arrow_schema_types( schema: pyarrow.Schema, type_map: Dict[Callable[[pyarrow.DataType], bool], Callable[..., pyarrow.DataType]], diff --git a/dlt/common/utils.py b/dlt/common/utils.py index 7109daf497..c1d130e477 100644 --- a/dlt/common/utils.py +++ b/dlt/common/utils.py @@ -10,6 +10,8 @@ from types import ModuleType import traceback import zlib +from importlib.metadata import version as pkg_version +from packaging.version import Version from typing import ( Any, @@ -29,7 +31,12 @@ Iterable, ) -from dlt.common.exceptions import DltException, ExceptionTrace, TerminalException +from dlt.common.exceptions import ( + DltException, + ExceptionTrace, + TerminalException, + DependencyVersionException, +) from dlt.common.typing import AnyFun, StrAny, DictStrAny, StrStr, TAny, TFun @@ -565,3 +572,14 @@ def order_deduped(lst: List[Any]) -> List[Any]: Only works for lists with hashable elements. """ return list(dict.fromkeys(lst)) + + +def assert_min_pkg_version(pkg_name: str, version: str, msg: str = "") -> None: + version_found = pkg_version(pkg_name) + if Version(version_found) < Version(version): + raise DependencyVersionException( + pkg_name=pkg_name, + version_found=version_found, + version_required=">=" + version, + appendix=msg, + ) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 5da367ea03..ef4702b17d 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -9,6 +9,7 @@ import dlt from dlt.common import logger, time, json, pendulum +from dlt.common.utils import assert_min_pkg_version from dlt.common.storages.fsspec_filesystem import glob_files from dlt.common.typing import DictStrAny from dlt.common.schema import Schema, TSchemaTables, TTableSchema @@ -122,23 +123,43 @@ def __init__( def write(self) -> None: from dlt.common.libs.pyarrow import pyarrow as pa from dlt.common.libs.deltalake import ( + DeltaTable, write_delta_table, + ensure_delta_compatible_arrow_schema, _deltalake_storage_options, try_get_deltatable, ) - file_paths = [job.file_path for job in self.table_jobs] + assert_min_pkg_version( + pkg_name="pyarrow", + version="17.0.0", + msg="`pyarrow>=17.0.0` is needed for `delta` table format on `filesystem` destination.", + ) - if ( - self.table["write_disposition"] == "merge" - and ( - dt := try_get_deltatable( - self.client.make_remote_uri(self.make_remote_path()), - storage_options=_deltalake_storage_options(self.client.config), + # create Arrow dataset from Parquet files + file_paths = [job.file_path for job in self.table_jobs] + arrow_ds = pa.dataset.dataset(file_paths) + + # create Delta table object + dt_path = self.client.make_remote_uri(self.make_remote_path()) + storage_options = _deltalake_storage_options(self.client.config) + dt = try_get_deltatable(dt_path, storage_options=storage_options) + + # explicitly check if there is data + # (https://github.com/delta-io/delta-rs/issues/2686) + if arrow_ds.head(1).num_rows == 0: + if dt is None: + # create new empty Delta table with schema from Arrow table + DeltaTable.create( + table_uri=dt_path, + schema=ensure_delta_compatible_arrow_schema(arrow_ds.schema), + mode="overwrite", ) - ) - is not None - ): + return + + arrow_rbr = arrow_ds.scanner().to_reader() # RecordBatchReader + + if self.table["write_disposition"] == "merge" and dt is not None: assert self.table["x-merge-strategy"] in self.client.capabilities.supported_merge_strategies # type: ignore[typeddict-item] if self.table["x-merge-strategy"] == "upsert": # type: ignore[typeddict-item] @@ -151,7 +172,7 @@ def write(self) -> None: qry = ( dt.merge( - source=pa.dataset.dataset(file_paths), + source=arrow_rbr, predicate=predicate, source_alias="source", target_alias="target", @@ -164,10 +185,10 @@ def write(self) -> None: else: write_delta_table( - path=self.client.make_remote_uri(self.make_remote_path()), - data=pa.dataset.dataset(file_paths), + table_or_uri=dt_path if dt is None else dt, + data=arrow_rbr, write_disposition=self.table["write_disposition"], - storage_options=_deltalake_storage_options(self.client.config), + storage_options=storage_options, ) def make_remote_path(self) -> str: diff --git a/pytest.ini b/pytest.ini index 07de69d3e3..1d4e0df6dc 100644 --- a/pytest.ini +++ b/pytest.ini @@ -10,4 +10,5 @@ python_functions = *_test test_* *_snippet filterwarnings= ignore::DeprecationWarning markers = essential: marks all essential tests - no_load: marks tests that do not load anything \ No newline at end of file + no_load: marks tests that do not load anything + needspyarrow17: marks tests that need pyarrow>=17.0.0 (deselected by default) \ No newline at end of file diff --git a/tests/cases.py b/tests/cases.py index fa346b8b49..aa2e8ed494 100644 --- a/tests/cases.py +++ b/tests/cases.py @@ -303,6 +303,7 @@ def arrow_table_all_data_types( include_date: bool = True, include_not_normalized_name: bool = True, include_name_clash: bool = False, + include_null: bool = True, num_rows: int = 3, tz="UTC", ) -> Tuple[Any, List[Dict[str, Any]], Dict[str, List[Any]]]: @@ -323,9 +324,11 @@ def arrow_table_all_data_types( "float_null": [round(random.uniform(0, 100), 4) for _ in range(num_rows - 1)] + [ None ], # decrease precision - "null": pd.Series([None for _ in range(num_rows)]), } + if include_null: + data["null"] = pd.Series([None for _ in range(num_rows)]) + if include_name_clash: data["pre Normalized Column"] = [random.choice(ascii_lowercase) for _ in range(num_rows)] include_not_normalized_name = True @@ -373,7 +376,7 @@ def arrow_table_all_data_types( "Pre Normalized Column": "pre_normalized_column", } ) - .drop(columns=["null"]) + .drop(columns=(["null"] if include_null else [])) .to_dict("records") ) if object_format == "object": diff --git a/tests/conftest.py b/tests/conftest.py index e819e26ebb..6c0384ea8a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,11 @@ import os import dataclasses import logging -from typing import List +import sys +import pytest +from typing import List, Iterator +from importlib.metadata import version as pkg_version +from packaging.version import Version # patch which providers to enable from dlt.common.configuration.providers import ( @@ -142,3 +146,11 @@ def _create_pipeline_instance_id(self) -> str: except Exception: pass + + +@pytest.fixture(autouse=True) +def pyarrow17_check(request) -> Iterator[None]: + if "needspyarrow17" in request.keywords: + if "pyarrow" not in sys.modules or Version(pkg_version("pyarrow")) < Version("17.0.0"): + pytest.skip("test needs `pyarrow>=17.0.0`") + yield diff --git a/tests/libs/test_deltalake.py b/tests/libs/test_deltalake.py index d55f788fbe..a162ff427b 100644 --- a/tests/libs/test_deltalake.py +++ b/tests/libs/test_deltalake.py @@ -1,5 +1,5 @@ import os -from typing import Iterator, Tuple, cast +from typing import Iterator, Tuple, Union, cast import pytest from deltalake import DeltaTable @@ -76,7 +76,21 @@ def test_deltalake_storage_options() -> None: assert _deltalake_storage_options(config)["aws_access_key_id"] == "i_will_overwrite" -def test_write_delta_table(filesystem_client) -> None: +@pytest.mark.needspyarrow17 +@pytest.mark.parametrize("arrow_data_type", (pa.Table, pa.RecordBatchReader)) +def test_write_delta_table( + filesystem_client, + arrow_data_type: Union[pa.Table, pa.RecordBatchReader], +) -> None: + def arrow_data( # type: ignore[return] + arrow_table: pa.Table, + return_type: Union[pa.Table, pa.RecordBatchReader], + ) -> Union[pa.Table, pa.RecordBatchReader]: + if return_type == pa.Table: + return arrow_table + elif return_type == pa.RecordBatchReader: + return arrow_table.to_reader() + client, remote_dir = filesystem_client client = cast(FilesystemClient, client) storage_options = _deltalake_storage_options(client.config) @@ -102,7 +116,10 @@ def test_write_delta_table(filesystem_client) -> None: # first write should create Delta table with same shape as input Arrow table write_delta_table( - remote_dir, arrow_table, write_disposition="append", storage_options=storage_options + remote_dir, + arrow_data(arrow_table, arrow_data_type), + write_disposition="append", + storage_options=storage_options, ) dt = DeltaTable(remote_dir, storage_options=storage_options) assert dt.version() == 0 @@ -117,7 +134,10 @@ def test_write_delta_table(filesystem_client) -> None: # another `append` should create a new table version with twice the number of rows write_delta_table( - remote_dir, arrow_table, write_disposition="append", storage_options=storage_options + remote_dir, + arrow_data(arrow_table, arrow_data_type), + write_disposition="append", + storage_options=storage_options, ) dt = DeltaTable(remote_dir, storage_options=storage_options) assert dt.version() == 1 @@ -125,7 +145,10 @@ def test_write_delta_table(filesystem_client) -> None: # the `replace` write disposition should trigger a "logical delete" write_delta_table( - remote_dir, arrow_table, write_disposition="replace", storage_options=storage_options + remote_dir, + arrow_data(arrow_table, arrow_data_type), + write_disposition="replace", + storage_options=storage_options, ) dt = DeltaTable(remote_dir, storage_options=storage_options) assert dt.version() == 2 @@ -137,7 +160,10 @@ def test_write_delta_table(filesystem_client) -> None: # `merge` should resolve to `append` bevavior write_delta_table( - remote_dir, arrow_table, write_disposition="merge", storage_options=storage_options + remote_dir, + arrow_data(arrow_table, arrow_data_type), + write_disposition="merge", + storage_options=storage_options, ) dt = DeltaTable(remote_dir, storage_options=storage_options) assert dt.version() == 3 @@ -153,7 +179,10 @@ def test_write_delta_table(filesystem_client) -> None: # new column should be propagated to Delta table (schema evolution is supported) write_delta_table( - remote_dir, evolved_arrow_table, write_disposition="append", storage_options=storage_options + remote_dir, + arrow_data(evolved_arrow_table, arrow_data_type), + write_disposition="append", + storage_options=storage_options, ) dt = DeltaTable(remote_dir, storage_options=storage_options) assert dt.version() == 4 @@ -164,7 +193,10 @@ def test_write_delta_table(filesystem_client) -> None: # providing a subset of columns should lead to missing columns being null-filled write_delta_table( - remote_dir, arrow_table, write_disposition="append", storage_options=storage_options + remote_dir, + arrow_data(arrow_table, arrow_data_type), + write_disposition="append", + storage_options=storage_options, ) dt = DeltaTable(remote_dir, storage_options=storage_options) assert dt.version() == 5 @@ -176,7 +208,7 @@ def test_write_delta_table(filesystem_client) -> None: # unsupported value for `write_disposition` should raise ValueError write_delta_table( remote_dir, - arrow_table, + arrow_data(arrow_table, arrow_data_type), write_disposition="foo", # type:ignore[arg-type] storage_options=storage_options, ) diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 2f0e074621..308a1d516c 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -3,6 +3,8 @@ import posixpath from pathlib import Path from typing import Any, Callable, List, Dict, cast +from importlib.metadata import version as pkg_version +from packaging.version import Version from pytest_mock import MockerFixture import dlt @@ -12,6 +14,7 @@ from dlt.common import pendulum from dlt.common.storages.load_package import ParsedLoadJobFileName from dlt.common.utils import uniq_id +from dlt.common.exceptions import DependencyVersionException from dlt.destinations import filesystem from dlt.destinations.impl.filesystem.filesystem import FilesystemClient from dlt.destinations.impl.filesystem.typing import TExtraPlaceholders @@ -25,6 +28,7 @@ destinations_configs, DestinationTestConfiguration, MEMORY_BUCKET, + FILE_BUCKET, ) from tests.pipeline.utils import load_table_counts, assert_load_info, load_tables_to_dicts @@ -218,6 +222,29 @@ def some_source(): assert table.column("value").to_pylist() == [1, 2, 3, 4, 5] +def test_delta_table_pyarrow_version_check() -> None: + """Tests pyarrow version checking for `delta` table format. + + DependencyVersionException should be raised if pyarrow<17.0.0. + """ + # test intentionally does not use destination_configs(), because that + # function automatically marks `delta` table format configs as + # `needspyarrow17`, which should not happen for this test to run in an + # environment where pyarrow<17.0.0 + + assert Version(pkg_version("pyarrow")) < Version("17.0.0"), "test assumes `pyarrow<17.0.0`" + + @dlt.resource(table_format="delta") + def foo(): + yield {"foo": 1, "bar": 2} + + pipeline = dlt.pipeline(destination=filesystem(FILE_BUCKET)) + + with pytest.raises(PipelineStepFailed) as pip_ex: + pipeline.run(foo()) + assert isinstance(pip_ex.value.__context__, DependencyVersionException) + + @pytest.mark.essential @pytest.mark.parametrize( "destination_config", @@ -404,6 +431,96 @@ def complex_table(): assert len(rows_dict["complex_table__child__grandchild"]) == 5 +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + local_filesystem_configs=True, + table_format="delta", + ), + ids=lambda x: x.name, +) +def test_delta_table_empty_source( + destination_config: DestinationTestConfiguration, +) -> None: + """Tests empty source handling for `delta` table format. + + Tests both empty Arrow table and `dlt.mark.materialize_table_schema()`. + """ + from dlt.common.libs.pyarrow import pyarrow as pa + from dlt.common.libs.deltalake import ensure_delta_compatible_arrow_data + from tests.pipeline.utils import _get_delta_table, users_materialize_table_schema + + @dlt.resource(table_format="delta") + def delta_table(data): + yield data + + # create empty Arrow table with schema + arrow_table = arrow_table_all_data_types( + "arrow-table", + include_decimal_default_precision=False, + include_decimal_arrow_max_precision=True, + include_not_normalized_name=False, + include_null=False, + num_rows=2, + )[0] + empty_arrow_table = arrow_table.schema.empty_table() + assert empty_arrow_table.num_rows == 0 # it's empty + assert empty_arrow_table.schema.equals(arrow_table.schema) # it has a schema + + pipeline = destination_config.setup_pipeline("fs_pipe", dev_mode=True) + + # run 1: empty Arrow table with schema + # this should create empty Delta table with same schema as Arrow table + info = pipeline.run(delta_table(empty_arrow_table)) + assert_load_info(info) + client = cast(FilesystemClient, pipeline.destination_client()) + dt = _get_delta_table(client, "delta_table") + assert dt.version() == 0 + dt_arrow_table = dt.to_pyarrow_table() + assert dt_arrow_table.shape == (0, empty_arrow_table.num_columns) + assert dt_arrow_table.schema.equals( + ensure_delta_compatible_arrow_data(empty_arrow_table).schema + ) + + # run 2: non-empty Arrow table with same schema as run 1 + # this should load records into Delta table + info = pipeline.run(delta_table(arrow_table)) + assert_load_info(info) + dt = _get_delta_table(client, "delta_table") + assert dt.version() == 1 + dt_arrow_table = dt.to_pyarrow_table() + assert dt_arrow_table.shape == (2, empty_arrow_table.num_columns) + assert dt_arrow_table.schema.equals( + ensure_delta_compatible_arrow_data(empty_arrow_table).schema + ) + + # run 3: empty Arrow table with different schema + # this should not alter the Delta table + empty_arrow_table_2 = pa.schema( + [pa.field("foo", pa.int64()), pa.field("bar", pa.string())] + ).empty_table() + + info = pipeline.run(delta_table(empty_arrow_table_2)) + assert_load_info(info) + dt = _get_delta_table(client, "delta_table") + assert dt.version() == 1 # still 1, no new commit was done + dt_arrow_table = dt.to_pyarrow_table() + assert dt_arrow_table.shape == (2, empty_arrow_table.num_columns) # shape did not change + assert dt_arrow_table.schema.equals( # schema did not change + ensure_delta_compatible_arrow_data(empty_arrow_table).schema + ) + + # test `dlt.mark.materialize_table_schema()` + users_materialize_table_schema.apply_hints(table_format="delta") + info = pipeline.run(users_materialize_table_schema()) + assert_load_info(info) + dt = _get_delta_table(client, "users") + assert dt.version() == 0 + dt_arrow_table = dt.to_pyarrow_table() + assert dt_arrow_table.num_rows == 0 + assert "id", "name" == dt_arrow_table.schema.names[:2] + + @pytest.mark.parametrize( "destination_config", destinations_configs( diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index cc11236d93..8b41c354b2 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -101,6 +101,10 @@ def test_core_functionality( validity_column_names: List[str], active_record_timestamp: Optional[pendulum.DateTime], ) -> None: + # somehow destination_config comes through as ParameterSet instead of + # DestinationTestConfiguration + destination_config = destination_config.values[0] # type: ignore[attr-defined] + p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource( diff --git a/tests/load/utils.py b/tests/load/utils.py index 5c8689587f..66211b1701 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -2,7 +2,7 @@ import contextlib import codecs import os -from typing import Any, Iterator, List, Sequence, IO, Tuple, Optional, Dict, Union, Generator +from typing import Any, Iterator, List, Sequence, IO, Tuple, Optional, Dict, Union, Generator, cast import shutil from pathlib import Path from urllib.parse import urlparse @@ -591,6 +591,18 @@ def destinations_configs( conf for conf in destination_configs if conf.force_iceberg is force_iceberg ] + # add marks + destination_configs = [ + cast( + DestinationTestConfiguration, + pytest.param( + conf, + marks=pytest.mark.needspyarrow17 if conf.table_format == "delta" else [], + ), + ) + for conf in destination_configs + ] + return destination_configs diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index 308cdcd91d..ea6f54b0c6 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -437,28 +437,20 @@ def pandas_incremental(numbers=dlt.sources.incremental("Numbers")): def test_empty_parquet(test_storage: FileStorage) -> None: from dlt.destinations import filesystem + from tests.pipeline.utils import users_materialize_table_schema local = filesystem(os.path.abspath(TEST_STORAGE_ROOT)) # we have two options to materialize columns: add columns hint or use dlt.mark to emit schema # at runtime. below we use the second option - @dlt.resource - def users(): - yield dlt.mark.with_hints( - # this is a special empty item which will materialize table schema - dlt.mark.materialize_table_schema(), - # emit table schema with the item - dlt.mark.make_hints( - columns=[ - {"name": "id", "data_type": "bigint", "precision": 4, "nullable": False}, - {"name": "name", "data_type": "text", "nullable": False}, - ] - ), - ) - # write parquet file to storage - info = dlt.run(users, destination=local, loader_file_format="parquet", dataset_name="user_data") + info = dlt.run( + users_materialize_table_schema, + destination=local, + loader_file_format="parquet", + dataset_name="user_data", + ) assert_load_info(info) assert set(info.pipeline.default_schema.tables["users"]["columns"].keys()) == {"id", "name", "_dlt_load_id", "_dlt_id"} # type: ignore # find parquet file diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index 7016f5ea5e..bd62f76dc1 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -76,6 +76,21 @@ def many_delayed(many, iters): yield dlt.resource(run_deferred(iters), name="resource_" + str(n)) +@dlt.resource(table_name="users") +def users_materialize_table_schema(): + yield dlt.mark.with_hints( + # this is a special empty item which will materialize table schema + dlt.mark.materialize_table_schema(), + # emit table schema with the item + dlt.mark.make_hints( + columns=[ + {"name": "id", "data_type": "bigint", "precision": 4, "nullable": False}, + {"name": "name", "data_type": "text", "nullable": False}, + ] + ), + ) + + # # Utils for accessing data in pipelines #