diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql new file mode 100644 index 000000000000..46e2328745f1 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql @@ -0,0 +1,19 @@ +{% macro clean_tmp_tables(schemas) -%} + {{ adapter.dispatch('clean_tmp_tables')(schemas) }} +{%- endmacro %} + +-- default +{% macro default__clean_tmp_tables(schemas) -%} + {% do exceptions.warn("\tINFO: CLEANING TEST LEFTOVERS IS NOT IMPLEMENTED FOR THIS DESTINATION. CONSIDER TO REMOVE TEST TABLES MANUALY.\n") %} +{%- endmacro %} + +-- for redshift +{% macro redshift__clean_tmp_tables(schemas) %} + {%- for tmp_schema in schemas -%} + {% do log("\tDROP SCHEMA IF EXISTS " ~ tmp_schema, info=True) %} + {%- set drop_query -%} + drop schema if exists {{ tmp_schema }} cascade; + {%- endset -%} + {%- do run_query(drop_query) -%} + {%- endfor -%} +{% endmacro %} \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py index bbaaa25f536f..ead7e2ad0d0d 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py @@ -5,6 +5,7 @@ import json import os +import pathlib import random import re import socket @@ -14,8 +15,9 @@ import threading import time from copy import copy -from typing import Any, Callable, Dict, List +from typing import Any, Callable, Dict, List, Union +import yaml from normalization.destination_type import DestinationType from normalization.transform_catalog.transform import read_yaml_config, write_yaml_config from normalization.transform_config.transform import TransformConfig @@ -414,8 +416,15 @@ def dbt_run(self, destination_type: DestinationType, test_root_dir: str, force_f # Compile dbt models files into destination sql dialect, then run the transformation queries assert self.run_check_dbt_command(normalization_image, "run", test_root_dir, force_full_refresh) - @staticmethod - def run_check_dbt_command(normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool: + def dbt_run_macro(self, destination_type: DestinationType, test_root_dir: str, macro: str, macro_args: str = None): + """ + Run the dbt CLI to perform transformations on the test raw data in the destination, using independent macro. + """ + normalization_image: str = self.get_normalization_image(destination_type) + # Compile dbt models files into destination sql dialect, then run the transformation queries + assert self.run_dbt_run_operation(normalization_image, test_root_dir, macro, macro_args) + + def run_check_dbt_command(self, normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool: """ Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs """ @@ -424,7 +433,6 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc else: dbtAdditionalArgs = ["--event-buffer-size=10000"] - error_count = 0 commands = ( [ "docker", @@ -458,6 +466,45 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc command = f"{command} --full-refresh" print("Executing: ", " ".join(commands)) print(f"Equivalent to: dbt {command} --profiles-dir={cwd} --project-dir={cwd}") + return self.run_check_dbt_subprocess(commands, cwd) + + def run_dbt_run_operation(self, normalization_image: str, cwd: str, macro: str, macro_args: str = None) -> bool: + """ + Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs + """ + args = ["--args", macro_args] if macro_args else [] + commands = ( + [ + "docker", + "run", + "--rm", + "--init", + "-v", + f"{cwd}:/workspace", + "-v", + f"{cwd}/build:/build", + "-v", + f"{cwd}/logs:/logs", + "-v", + f"{cwd}/build/dbt_packages:/dbt", + "--network", + "host", + "--entrypoint", + "/usr/local/bin/dbt", + "-i", + normalization_image, + ] + + ["run-operation", macro] + + args + + ["--profiles-dir=/workspace", "--project-dir=/workspace"] + ) + + print("Executing: ", " ".join(commands)) + print(f"Equivalent to: dbt run-operation {macro} --args {macro_args} --profiles-dir={cwd} --project-dir={cwd}") + return self.run_check_dbt_subprocess(commands, cwd) + + def run_check_dbt_subprocess(self, commands: list, cwd: str): + error_count = 0 with open(os.path.join(cwd, "dbt_output.log"), "ab") as f: process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ) for line in iter(lambda: process.stdout.readline(), b""): @@ -552,3 +599,94 @@ def update_yaml_file(filename: str, callback: Callable): updated, config = callback(config) if updated: write_yaml_config(config, filename) + + def clean_tmp_tables( + self, + destination_type: Union[DestinationType, List[DestinationType]], + test_type: str, + tmp_folders: list = None, + git_versioned_tests: list = None, + ): + """ + Cleans-up all temporary schemas created during the test session. + It parses the provided tmp_folders: List[str] or uses `git_versioned_tests` to find sources.yml files generated for the tests. + It gets target schemas created by the tests and removes them using custom scenario specified in + `dbt-project-template/macros/clean_tmp_tables.sql` macro. + + REQUIREMENTS: + 1) Idealy, the schemas should have unique names like: test_normalization_ to avoid conflicts. + 2) The `clean_tmp_tables.sql` macro should have the specific macro for target destination to proceed. + + INPUT ARGUMENTS: + :: destination_type : either single destination or list of destinations + :: test_type: either "ephemeral" or "normalization" should be supplied. + :: tmp_folders: should be supplied if test_type = "ephemeral", to get schemas from /build/normalization_test_output folders + :: git_versioned_tests: should be supplied if test_type = "normalization", to get schemas from integration_tests/normalization_test_output folders + + EXAMPLE: + clean_up_args = { + "destination_type": [ DestinationType.REDSHIFT, DestinationType.POSTGRES, ... ] + "test_type": "normalization", + "git_versioned_tests": git_versioned_tests, + } + """ + + path_to_sources: str = "/models/generated/sources.yml" + test_folders: dict = {} + source_files: dict = {} + schemas_to_remove: dict = {} + + # collecting information about tmp_tables created for the test for each destination + for destination in destination_type: + test_folders[destination.value] = [] + source_files[destination.value] = [] + schemas_to_remove[destination.value] = [] + + # based on test_type select path to source files + if test_type == "ephemeral": + if not tmp_folders: + raise TypeError("`tmp_folders` arg is not provided.") + for folder in tmp_folders: + if destination.value in folder: + test_folders[destination.value].append(folder) + source_files[destination.value].append(f"{folder}{path_to_sources}") + elif test_type == "normalization": + if not git_versioned_tests: + raise TypeError("`git_versioned_tests` arg is not provided.") + base_path = f"{pathlib.Path().absolute()}/integration_tests/normalization_test_output" + for test in git_versioned_tests: + test_root_dir: str = f"{base_path}/{destination.value}/{test}" + test_folders[destination.value].append(test_root_dir) + source_files[destination.value].append(f"{test_root_dir}{path_to_sources}") + else: + raise TypeError(f"\n`test_type`: {test_type} is not a registered, use `ephemeral` or `normalization` instead.\n") + + # parse source.yml files from test folders to get schemas and table names created for the tests + for file in source_files[destination.value]: + source_yml = {} + try: + with open(file, "r") as source_file: + source_yml = yaml.safe_load(source_file) + except FileNotFoundError: + print(f"\n{destination.value}: {file} doesn't exist, consider to remove any temp_tables and schemas manually!\n") + pass + test_sources: list = source_yml.get("sources", []) if source_yml else [] + + for source in test_sources: + target_schema: str = source.get("name") + if target_schema not in schemas_to_remove[destination.value]: + schemas_to_remove[destination.value].append(target_schema) + # adding _airbyte_* tmp schemas to be removed + schemas_to_remove[destination.value].append(f"_airbyte_{target_schema}") + + # cleaning up tmp_tables generated by the tests + for destination in destination_type: + if not schemas_to_remove[destination.value]: + print(f"\n\t{destination.value.upper()} DESTINATION: SKIP CLEANING, NOTHING TO REMOVE.\n") + else: + print(f"\n\t{destination.value.upper()} DESTINATION: CLEANING LEFTOVERS...\n") + print(f"\t{schemas_to_remove[destination.value]}\n") + test_root_folder = test_folders[destination.value][0] + args = json.dumps({"schemas": schemas_to_remove[destination.value]}) + self.dbt_check(destination, test_root_folder) + self.dbt_run_macro(destination, test_root_folder, "clean_tmp_tables", args) diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py index 22d968ec5da5..9e86a5771e33 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py @@ -23,6 +23,12 @@ @pytest.fixture(scope="module", autouse=True) def before_all_tests(request): destinations_to_test = dbt_test_utils.get_test_targets() + # set clean-up args to clean target destination after the test + clean_up_args = { + "destination_type": [d for d in DestinationType if d.value in destinations_to_test], + "test_type": "ephemeral", + "tmp_folders": temporary_folders, + } if DestinationType.POSTGRES.value not in destinations_to_test: destinations_to_test.append(DestinationType.POSTGRES.value) dbt_test_utils.set_target_schema("test_ephemeral") @@ -30,6 +36,7 @@ def before_all_tests(request): dbt_test_utils.setup_db(destinations_to_test) os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"] yield + dbt_test_utils.clean_tmp_tables(**clean_up_args) dbt_test_utils.tear_down_db() for folder in temporary_folders: print(f"Deleting temporary test folder {folder}") @@ -91,6 +98,9 @@ def run_test(destination_type: DestinationType, column_count: int, expected_exce if destination_type.value == DestinationType.ORACLE.value: # Oracle does not allow changing to random schema dbt_test_utils.set_target_schema("test_normalization") + elif destination_type.value == DestinationType.REDSHIFT.value: + # set unique schema for Redshift test + dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_ephemeral_")) else: dbt_test_utils.set_target_schema("test_ephemeral") print("Testing ephemeral") diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py index 8c6485796ed0..0c72fddf76a7 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py @@ -28,6 +28,12 @@ @pytest.fixture(scope="module", autouse=True) def before_all_tests(request): destinations_to_test = dbt_test_utils.get_test_targets() + # set clean-up args to clean target destination after the test + clean_up_args = { + "destination_type": [d for d in DestinationType if d.value in destinations_to_test], + "test_type": "normalization", + "git_versioned_tests": git_versioned_tests, + } for integration_type in [d.value for d in DestinationType]: if integration_type in destinations_to_test: test_root_dir = f"{pathlib.Path().absolute()}/normalization_test_output/{integration_type.lower()}" @@ -39,11 +45,11 @@ def before_all_tests(request): dbt_test_utils.setup_db(destinations_to_test) os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"] yield + dbt_test_utils.clean_tmp_tables(**clean_up_args) dbt_test_utils.tear_down_db() for folder in temporary_folders: print(f"Deleting temporary test folder {folder}") shutil.rmtree(folder, ignore_errors=True) - # TODO delete target_schema in destination by copying dbt_project.yml and injecting a on-run-end hook to clean up @pytest.fixture @@ -78,6 +84,9 @@ def test_normalization(destination_type: DestinationType, test_resource_name: st if destination_type.value == DestinationType.ORACLE.value: # Oracle does not allow changing to random schema dbt_test_utils.set_target_schema("test_normalization") + elif destination_type.value == DestinationType.REDSHIFT.value: + # set unique schema for Redshift test + dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_normalization_")) try: run_test_normalization(destination_type, test_resource_name) finally: @@ -498,6 +507,11 @@ def to_lower_identifier(input: re.Match) -> str: def test_redshift_normalization_migration(tmp_path, setup_test_path): destination_type = DestinationType.REDSHIFT + clean_up_args = { + "destination_type": [destination_type], + "test_type": "ephemeral", # "ephemeral", because we parse /tmp folders + "tmp_folders": [str(tmp_path)], + } if destination_type.value not in dbt_test_utils.get_test_targets(): pytest.skip(f"Destinations {destination_type} is not in NORMALIZATION_TEST_TARGET env variable") base_dir = pathlib.Path(os.path.realpath(os.path.join(__file__, "../.."))) @@ -535,3 +549,5 @@ def test_redshift_normalization_migration(tmp_path, setup_test_path): run_destination_process(destination_type, tmp_path, messages_file2, "destination_catalog.json", docker_tag="dev") dbt_test_utils.dbt_run(destination_type, tmp_path, force_full_refresh=False) dbt_test(destination_type, tmp_path) + # clean-up test tables created for this test + dbt_test_utils.clean_tmp_tables(**clean_up_args)