From 2ca03ce0330e8c1c1e94648a7f10d5385e458b2f Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 22 Jun 2022 17:43:34 +0300 Subject: [PATCH 01/10] redshift tests are now could self clean-up from tmp tables --- .../macros/clean_tmp_tables.sql | 10 ++ .../integration_tests/dbt_integration_test.py | 158 +++++++++++++++++- .../integration_tests/test_ephemeral.py | 5 + .../integration_tests/test_normalization.py | 22 ++- 4 files changed, 184 insertions(+), 11 deletions(-) create mode 100644 airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql new file mode 100644 index 000000000000..e191b60a33c4 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql @@ -0,0 +1,10 @@ +-- macro to clean Redshift normalization tmp tables +{% macro redshift__clean_tmp_tables(schemas) %} + {%- for tmp_schema in schemas -%} + {% do log("\tDROPING SCHEMA " ~ tmp_schema, info=True) %} + {%- set drop_query -%} + drop schema if exists {{ tmp_schema }} cascade; + {%- endset -%} + {%- do run_query(drop_query) -%} + {%- endfor -%} +{% endmacro %} \ No newline at end of file diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py index bbaaa25f536f..d577d43eca31 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py @@ -13,6 +13,8 @@ import sys import threading import time +import yaml +import pathlib from copy import copy from typing import Any, Callable, Dict, List @@ -26,7 +28,7 @@ NORMALIZATION_TEST_POSTGRES_DB_PORT = "NORMALIZATION_TEST_POSTGRES_DB_PORT" NORMALIZATION_TEST_CLICKHOUSE_DB_PORT = "NORMALIZATION_TEST_CLICKHOUSE_DB_PORT" NORMALIZATION_TEST_CLICKHOUSE_DB_TCP_PORT = "NORMALIZATION_TEST_CLICKHOUSE_DB_TCP_PORT" - +NORMALIZATION_TYPE_TEST: List = ["ephemeral", "normalization"] class DbtIntegrationTest(object): def __init__(self): @@ -413,7 +415,15 @@ def dbt_run(self, destination_type: DestinationType, test_root_dir: str, force_f normalization_image: str = self.get_normalization_image(destination_type) # Compile dbt models files into destination sql dialect, then run the transformation queries assert self.run_check_dbt_command(normalization_image, "run", test_root_dir, force_full_refresh) - + + def dbt_run_macro(self, destination_type: DestinationType, test_root_dir: str, macro: str, macro_args: str = None): + """ + Run the dbt CLI to perform transformations on the test raw data in the destination, using independent macro. + """ + normalization_image: str = self.get_normalization_image(destination_type) + # Compile dbt models files into destination sql dialect, then run the transformation queries + assert self.run_check_dbt_macro(normalization_image, test_root_dir, macro, macro_args) + @staticmethod def run_check_dbt_command(normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool: """ @@ -423,8 +433,7 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc dbtAdditionalArgs = [] else: dbtAdditionalArgs = ["--event-buffer-size=10000"] - - error_count = 0 + commands = ( [ "docker", @@ -458,6 +467,8 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc command = f"{command} --full-refresh" print("Executing: ", " ".join(commands)) print(f"Equivalent to: dbt {command} --profiles-dir={cwd} --project-dir={cwd}") + + error_count = 0 with open(os.path.join(cwd, "dbt_output.log"), "ab") as f: process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ) for line in iter(lambda: process.stdout.readline(), b""): @@ -495,7 +506,81 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc if error_count > 0: return False return process.returncode == 0 - + + @staticmethod + def run_check_dbt_macro(normalization_image: str, cwd: str, macro: str, macro_args: str = None) -> bool: + """ + Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs + """ + args = [ '--args', macro_args ] if macro_args else [] + commands = ( + [ + "docker", + "run", + "--rm", + "--init", + "-v", + f"{cwd}:/workspace", + "-v", + f"{cwd}/build:/build", + "-v", + f"{cwd}/logs:/logs", + "-v", + f"{cwd}/build/dbt_packages:/dbt", + "--network", + "host", + "--entrypoint", + "/usr/local/bin/dbt", + "-i", + normalization_image, + ] + + [ 'run-operation', macro ] + + args + + [ "--profiles-dir=/workspace", "--project-dir=/workspace" ] + ) + + print("Executing: ", " ".join(commands)) + print(f"Equivalent to: dbt run-operation {macro} --args {macro_args} --profiles-dir={cwd} --project-dir={cwd}") + + error_count = 0 + with open(os.path.join(cwd, "dbt_output.log"), "ab") as f: + process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ) + for line in iter(lambda: process.stdout.readline(), b""): + f.write(line) + str_line = line.decode("utf-8") + sys.stdout.write(str_line) + # keywords to match lines as signaling errors + if "ERROR" in str_line or "FAIL" in str_line or "WARNING" in str_line: + # exception keywords in lines to ignore as errors (such as summary or expected warnings) + is_exception = False + for except_clause in [ + "Done.", # DBT Summary + "PASS=", # DBT Summary + "Nothing to do.", # When no schema/data tests are setup + "Configuration paths exist in your dbt_project.yml", # When no cte / view are generated + "Error loading config file: .dockercfg: $HOME is not defined", # ignore warning + "depends on a node named 'disabled_test' which was not found", # Tests throwing warning because it is disabled + "The requested image's platform (linux/amd64) does not match the detected host platform " + + "(linux/arm64/v8) and no specific platform was requested", # temporary patch until we publish images for arm64 + ]: + if except_clause in str_line: + is_exception = True + break + if not is_exception: + # count lines signaling an error/failure/warning + error_count += 1 + process.wait() + message = ( + f"{' '.join(commands)}\n\tterminated with return code {process.returncode} " + f"with {error_count} 'Error/Warning/Fail' mention(s)." + ) + print(message) + assert error_count == 0, message + assert process.returncode == 0, message + if error_count > 0: + return False + return process.returncode == 0 + @staticmethod def copy_replace(src, dst, pattern=None, replace_value=None): """ @@ -552,3 +637,66 @@ def update_yaml_file(filename: str, callback: Callable): updated, config = callback(config) if updated: write_yaml_config(config, filename) + + def clean_tmp_tables( + self, + destination_type: DestinationType, + test_type: str = NORMALIZATION_TYPE_TEST[0], + tmp_folders: list = None, + git_versioned_tests: list = None, + ): + + path_to_sources: str = '/models/generated/sources.yml' + test_folders: list = [] + source_files: list = [] + schemas_to_remove: list = [] + macro_name: str = "" + + # choose macro, based on destination_type, for future extendibility + # other destination-specific macros could be added by the path: + # dbt-project-template/macros/clean_tmp_tables.sql + if destination_type == DestinationType.REDSHIFT: + macro_name = "redshift__clean_tmp_tables" + + # based on test_type select path to source files + if test_type == NORMALIZATION_TYPE_TEST[0]: + for folder in tmp_folders: + if destination_type.value in folder: + test_folders.append(folder) + source_files.append(f'{folder}{path_to_sources}') + elif test_type == NORMALIZATION_TYPE_TEST[1]: + base_path = f'{pathlib.Path().absolute()}/integration_tests/normalization_test_output' + for test in git_versioned_tests: + test_root_dir: str = f"{base_path}/{destination_type.value}/{test}" + test_folders.append(test_root_dir) + source_files.append(f'{test_root_dir}{path_to_sources}') + else: + raise TypeError(f"`test_type`: {test_type} is not a registered, use one of: {NORMALIZATION_TYPE_TEST}") + + # parse source.yml files from test folders to get schemas and table names created for the tests + for file in source_files: + source_yml = {} + try: + with open(file, "r") as source_file: + source_yml = yaml.safe_load(source_file) + except FileNotFoundError: + print("source.yml was removed or wasn't generated, consider to remove any temp tables and schemas manually!") + pass + test_sources: list = source_yml.get("sources", []) if source_yml else [] + for source in test_sources: + target_schema: str = source.get("name") + if target_schema not in schemas_to_remove: + schemas_to_remove.append(target_schema) + # adding _airbyte_* tmp schemas to be removed + schemas_to_remove.append(f'_airbyte_{target_schema}') + + # return None if there are no tables to remove + if not schemas_to_remove: + return None + + # prepare args for macro + args = json.dumps({"schemas": schemas_to_remove}) + # prepare dbt deps + self.dbt_check(destination_type, test_folders[0]) + # run dbt macro + self.dbt_run_macro(destination_type, test_folders[0], macro_name, args) diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py index 22d968ec5da5..5f423e1c614c 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py @@ -30,6 +30,8 @@ def before_all_tests(request): dbt_test_utils.setup_db(destinations_to_test) os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"] yield + # clean-up tmp tables for Redshift + dbt_test_utils.clean_tmp_tables(destination_type=DestinationType.REDSHIFT, tmp_folders=temporary_folders) dbt_test_utils.tear_down_db() for folder in temporary_folders: print(f"Deleting temporary test folder {folder}") @@ -91,6 +93,9 @@ def run_test(destination_type: DestinationType, column_count: int, expected_exce if destination_type.value == DestinationType.ORACLE.value: # Oracle does not allow changing to random schema dbt_test_utils.set_target_schema("test_normalization") + elif destination_type.value == DestinationType.REDSHIFT.value: + # set unique schema for Redshift test + dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_ephemeral_")) else: dbt_test_utils.set_target_schema("test_ephemeral") print("Testing ephemeral") diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py index 8c6485796ed0..1489be9e2355 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py @@ -12,7 +12,7 @@ from typing import Any, Dict import pytest -from integration_tests.dbt_integration_test import DbtIntegrationTest +from integration_tests.dbt_integration_test import DbtIntegrationTest, NORMALIZATION_TYPE_TEST from normalization.destination_type import DestinationType from normalization.transform_catalog import TransformCatalog @@ -24,7 +24,6 @@ dbt_test_utils = DbtIntegrationTest() - @pytest.fixture(scope="module", autouse=True) def before_all_tests(request): destinations_to_test = dbt_test_utils.get_test_targets() @@ -39,12 +38,16 @@ def before_all_tests(request): dbt_test_utils.setup_db(destinations_to_test) os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"] yield + # clean-up tmp tables for Redshift + dbt_test_utils.clean_tmp_tables( + destination_type=DestinationType.REDSHIFT, + test_type=NORMALIZATION_TYPE_TEST[1], + git_versioned_tests=git_versioned_tests + ) dbt_test_utils.tear_down_db() for folder in temporary_folders: print(f"Deleting temporary test folder {folder}") shutil.rmtree(folder, ignore_errors=True) - # TODO delete target_schema in destination by copying dbt_project.yml and injecting a on-run-end hook to clean up - @pytest.fixture def setup_test_path(request): @@ -54,7 +57,6 @@ def setup_test_path(request): yield os.chdir(request.config.invocation_dir) - @pytest.mark.parametrize( "test_resource_name", set( @@ -78,6 +80,9 @@ def test_normalization(destination_type: DestinationType, test_resource_name: st if destination_type.value == DestinationType.ORACLE.value: # Oracle does not allow changing to random schema dbt_test_utils.set_target_schema("test_normalization") + elif destination_type.value == DestinationType.REDSHIFT.value: + # set unique schema for Redshift test + dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_normalization_")) try: run_test_normalization(destination_type, test_resource_name) finally: @@ -528,10 +533,15 @@ def test_redshift_normalization_migration(tmp_path, setup_test_path): "profile_config_dir": tmp_path, } transform_catalog.process_catalog() - + run_destination_process(destination_type, tmp_path, messages_file1, "destination_catalog.json", docker_tag="0.3.29") dbt_test_utils.dbt_check(destination_type, tmp_path) dbt_test_utils.dbt_run(destination_type, tmp_path, force_full_refresh=True) run_destination_process(destination_type, tmp_path, messages_file2, "destination_catalog.json", docker_tag="dev") dbt_test_utils.dbt_run(destination_type, tmp_path, force_full_refresh=False) dbt_test(destination_type, tmp_path) + # clean-up test tables + dbt_test_utils.clean_tmp_tables(destination_type=DestinationType.REDSHIFT, tmp_folders=[str(tmp_path)]) + + + \ No newline at end of file From 3cba61d22f10f6332f277a5dd2362568d7471237 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 22 Jun 2022 17:56:57 +0300 Subject: [PATCH 02/10] formated code --- .../integration_tests/dbt_integration_test.py | 59 ++++++++++--------- .../integration_tests/test_ephemeral.py | 4 +- .../integration_tests/test_normalization.py | 16 +++-- 3 files changed, 39 insertions(+), 40 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py index d577d43eca31..a16a05bb98fe 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py @@ -5,6 +5,7 @@ import json import os +import pathlib import random import re import socket @@ -13,11 +14,10 @@ import sys import threading import time -import yaml -import pathlib from copy import copy from typing import Any, Callable, Dict, List +import yaml from normalization.destination_type import DestinationType from normalization.transform_catalog.transform import read_yaml_config, write_yaml_config from normalization.transform_config.transform import TransformConfig @@ -30,6 +30,7 @@ NORMALIZATION_TEST_CLICKHOUSE_DB_TCP_PORT = "NORMALIZATION_TEST_CLICKHOUSE_DB_TCP_PORT" NORMALIZATION_TYPE_TEST: List = ["ephemeral", "normalization"] + class DbtIntegrationTest(object): def __init__(self): self.target_schema = "test_normalization" @@ -415,7 +416,7 @@ def dbt_run(self, destination_type: DestinationType, test_root_dir: str, force_f normalization_image: str = self.get_normalization_image(destination_type) # Compile dbt models files into destination sql dialect, then run the transformation queries assert self.run_check_dbt_command(normalization_image, "run", test_root_dir, force_full_refresh) - + def dbt_run_macro(self, destination_type: DestinationType, test_root_dir: str, macro: str, macro_args: str = None): """ Run the dbt CLI to perform transformations on the test raw data in the destination, using independent macro. @@ -423,7 +424,7 @@ def dbt_run_macro(self, destination_type: DestinationType, test_root_dir: str, m normalization_image: str = self.get_normalization_image(destination_type) # Compile dbt models files into destination sql dialect, then run the transformation queries assert self.run_check_dbt_macro(normalization_image, test_root_dir, macro, macro_args) - + @staticmethod def run_check_dbt_command(normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool: """ @@ -433,7 +434,7 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc dbtAdditionalArgs = [] else: dbtAdditionalArgs = ["--event-buffer-size=10000"] - + commands = ( [ "docker", @@ -467,7 +468,7 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc command = f"{command} --full-refresh" print("Executing: ", " ".join(commands)) print(f"Equivalent to: dbt {command} --profiles-dir={cwd} --project-dir={cwd}") - + error_count = 0 with open(os.path.join(cwd, "dbt_output.log"), "ab") as f: process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ) @@ -506,13 +507,13 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc if error_count > 0: return False return process.returncode == 0 - + @staticmethod def run_check_dbt_macro(normalization_image: str, cwd: str, macro: str, macro_args: str = None) -> bool: """ Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs """ - args = [ '--args', macro_args ] if macro_args else [] + args = ["--args", macro_args] if macro_args else [] commands = ( [ "docker", @@ -534,14 +535,14 @@ def run_check_dbt_macro(normalization_image: str, cwd: str, macro: str, macro_ar "-i", normalization_image, ] - + [ 'run-operation', macro ] + + ["run-operation", macro] + args - + [ "--profiles-dir=/workspace", "--project-dir=/workspace" ] + + ["--profiles-dir=/workspace", "--project-dir=/workspace"] ) - + print("Executing: ", " ".join(commands)) print(f"Equivalent to: dbt run-operation {macro} --args {macro_args} --profiles-dir={cwd} --project-dir={cwd}") - + error_count = 0 with open(os.path.join(cwd, "dbt_output.log"), "ab") as f: process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ) @@ -580,7 +581,7 @@ def run_check_dbt_macro(normalization_image: str, cwd: str, macro: str, macro_ar if error_count > 0: return False return process.returncode == 0 - + @staticmethod def copy_replace(src, dst, pattern=None, replace_value=None): """ @@ -637,7 +638,7 @@ def update_yaml_file(filename: str, callback: Callable): updated, config = callback(config) if updated: write_yaml_config(config, filename) - + def clean_tmp_tables( self, destination_type: DestinationType, @@ -645,38 +646,38 @@ def clean_tmp_tables( tmp_folders: list = None, git_versioned_tests: list = None, ): - - path_to_sources: str = '/models/generated/sources.yml' + + path_to_sources: str = "/models/generated/sources.yml" test_folders: list = [] source_files: list = [] schemas_to_remove: list = [] macro_name: str = "" - + # choose macro, based on destination_type, for future extendibility - # other destination-specific macros could be added by the path: + # other destination-specific macros could be added by the path: # dbt-project-template/macros/clean_tmp_tables.sql if destination_type == DestinationType.REDSHIFT: macro_name = "redshift__clean_tmp_tables" - + # based on test_type select path to source files if test_type == NORMALIZATION_TYPE_TEST[0]: for folder in tmp_folders: if destination_type.value in folder: test_folders.append(folder) - source_files.append(f'{folder}{path_to_sources}') + source_files.append(f"{folder}{path_to_sources}") elif test_type == NORMALIZATION_TYPE_TEST[1]: - base_path = f'{pathlib.Path().absolute()}/integration_tests/normalization_test_output' + base_path = f"{pathlib.Path().absolute()}/integration_tests/normalization_test_output" for test in git_versioned_tests: test_root_dir: str = f"{base_path}/{destination_type.value}/{test}" test_folders.append(test_root_dir) - source_files.append(f'{test_root_dir}{path_to_sources}') + source_files.append(f"{test_root_dir}{path_to_sources}") else: raise TypeError(f"`test_type`: {test_type} is not a registered, use one of: {NORMALIZATION_TYPE_TEST}") - - # parse source.yml files from test folders to get schemas and table names created for the tests + + # parse source.yml files from test folders to get schemas and table names created for the tests for file in source_files: source_yml = {} - try: + try: with open(file, "r") as source_file: source_yml = yaml.safe_load(source_file) except FileNotFoundError: @@ -688,15 +689,15 @@ def clean_tmp_tables( if target_schema not in schemas_to_remove: schemas_to_remove.append(target_schema) # adding _airbyte_* tmp schemas to be removed - schemas_to_remove.append(f'_airbyte_{target_schema}') - - # return None if there are no tables to remove + schemas_to_remove.append(f"_airbyte_{target_schema}") + + # return None if there are no tables to remove if not schemas_to_remove: return None # prepare args for macro args = json.dumps({"schemas": schemas_to_remove}) - # prepare dbt deps + # prepare dbt deps self.dbt_check(destination_type, test_folders[0]) # run dbt macro self.dbt_run_macro(destination_type, test_folders[0], macro_name, args) diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py index 5f423e1c614c..959b5d358f57 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py @@ -30,7 +30,7 @@ def before_all_tests(request): dbt_test_utils.setup_db(destinations_to_test) os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"] yield - # clean-up tmp tables for Redshift + # clean-up tmp tables for Redshift dbt_test_utils.clean_tmp_tables(destination_type=DestinationType.REDSHIFT, tmp_folders=temporary_folders) dbt_test_utils.tear_down_db() for folder in temporary_folders: @@ -95,7 +95,7 @@ def run_test(destination_type: DestinationType, column_count: int, expected_exce dbt_test_utils.set_target_schema("test_normalization") elif destination_type.value == DestinationType.REDSHIFT.value: # set unique schema for Redshift test - dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_ephemeral_")) + dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_ephemeral_")) else: dbt_test_utils.set_target_schema("test_ephemeral") print("Testing ephemeral") diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py index 1489be9e2355..b31e2ba96d05 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py @@ -12,7 +12,7 @@ from typing import Any, Dict import pytest -from integration_tests.dbt_integration_test import DbtIntegrationTest, NORMALIZATION_TYPE_TEST +from integration_tests.dbt_integration_test import NORMALIZATION_TYPE_TEST, DbtIntegrationTest from normalization.destination_type import DestinationType from normalization.transform_catalog import TransformCatalog @@ -24,6 +24,7 @@ dbt_test_utils = DbtIntegrationTest() + @pytest.fixture(scope="module", autouse=True) def before_all_tests(request): destinations_to_test = dbt_test_utils.get_test_targets() @@ -38,17 +39,16 @@ def before_all_tests(request): dbt_test_utils.setup_db(destinations_to_test) os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"] yield - # clean-up tmp tables for Redshift + # clean-up tmp tables for Redshift dbt_test_utils.clean_tmp_tables( - destination_type=DestinationType.REDSHIFT, - test_type=NORMALIZATION_TYPE_TEST[1], - git_versioned_tests=git_versioned_tests + destination_type=DestinationType.REDSHIFT, test_type=NORMALIZATION_TYPE_TEST[1], git_versioned_tests=git_versioned_tests ) dbt_test_utils.tear_down_db() for folder in temporary_folders: print(f"Deleting temporary test folder {folder}") shutil.rmtree(folder, ignore_errors=True) + @pytest.fixture def setup_test_path(request): dbt_test_utils.change_current_test_dir(request) @@ -57,6 +57,7 @@ def setup_test_path(request): yield os.chdir(request.config.invocation_dir) + @pytest.mark.parametrize( "test_resource_name", set( @@ -533,7 +534,7 @@ def test_redshift_normalization_migration(tmp_path, setup_test_path): "profile_config_dir": tmp_path, } transform_catalog.process_catalog() - + run_destination_process(destination_type, tmp_path, messages_file1, "destination_catalog.json", docker_tag="0.3.29") dbt_test_utils.dbt_check(destination_type, tmp_path) dbt_test_utils.dbt_run(destination_type, tmp_path, force_full_refresh=True) @@ -542,6 +543,3 @@ def test_redshift_normalization_migration(tmp_path, setup_test_path): dbt_test(destination_type, tmp_path) # clean-up test tables dbt_test_utils.clean_tmp_tables(destination_type=DestinationType.REDSHIFT, tmp_folders=[str(tmp_path)]) - - - \ No newline at end of file From 5bcfbde347d2304b085c3aff460b1f747d7aee33 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 23 Jun 2022 22:56:18 +0300 Subject: [PATCH 03/10] updated after review --- .../macros/clean_tmp_tables.sql | 13 +- .../integration_tests/dbt_integration_test.py | 145 +++++++++++------- .../integration_tests/test_ephemeral.py | 9 +- .../integration_tests/test_normalization.py | 22 ++- 4 files changed, 119 insertions(+), 70 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql index e191b60a33c4..a42da0bcc243 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql @@ -1,7 +1,16 @@ --- macro to clean Redshift normalization tmp tables +{% macro clean_tmp_tables(schemas) -%} + {{ adapter.dispatch('clean_tmp_tables')(schemas) }} +{%- endmacro %} + +-- default +{% macro default__clean_tmp_tables(schemas) -%} + {% do exceptions.warn("\tCLEANING TEST LEFTOVERS IS NOT IMPLEMENTED FOR THIS DESTINATION.\n") %} +{%- endmacro %} + +-- for redshift {% macro redshift__clean_tmp_tables(schemas) %} {%- for tmp_schema in schemas -%} - {% do log("\tDROPING SCHEMA " ~ tmp_schema, info=True) %} + {% do log("\tDROP SCHEMA IF EXISTS " ~ tmp_schema, info=True) %} {%- set drop_query -%} drop schema if exists {{ tmp_schema }} cascade; {%- endset -%} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py index a16a05bb98fe..488351823f9a 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py @@ -15,7 +15,7 @@ import threading import time from copy import copy -from typing import Any, Callable, Dict, List +from typing import Any, Callable, Dict, List, Union import yaml from normalization.destination_type import DestinationType @@ -28,7 +28,6 @@ NORMALIZATION_TEST_POSTGRES_DB_PORT = "NORMALIZATION_TEST_POSTGRES_DB_PORT" NORMALIZATION_TEST_CLICKHOUSE_DB_PORT = "NORMALIZATION_TEST_CLICKHOUSE_DB_PORT" NORMALIZATION_TEST_CLICKHOUSE_DB_TCP_PORT = "NORMALIZATION_TEST_CLICKHOUSE_DB_TCP_PORT" -NORMALIZATION_TYPE_TEST: List = ["ephemeral", "normalization"] class DbtIntegrationTest(object): @@ -423,7 +422,7 @@ def dbt_run_macro(self, destination_type: DestinationType, test_root_dir: str, m """ normalization_image: str = self.get_normalization_image(destination_type) # Compile dbt models files into destination sql dialect, then run the transformation queries - assert self.run_check_dbt_macro(normalization_image, test_root_dir, macro, macro_args) + assert self.run_dbt_run_operation(normalization_image, test_root_dir, macro, macro_args) @staticmethod def run_check_dbt_command(normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool: @@ -509,7 +508,7 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc return process.returncode == 0 @staticmethod - def run_check_dbt_macro(normalization_image: str, cwd: str, macro: str, macro_args: str = None) -> bool: + def run_dbt_run_operation(normalization_image: str, cwd: str, macro: str, macro_args: str = None) -> bool: """ Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs """ @@ -641,63 +640,91 @@ def update_yaml_file(filename: str, callback: Callable): def clean_tmp_tables( self, - destination_type: DestinationType, - test_type: str = NORMALIZATION_TYPE_TEST[0], + destination_type: Union[DestinationType, List[DestinationType]], + test_type: str, tmp_folders: list = None, git_versioned_tests: list = None, ): + """ + Cleans-up all temporary schemas created during the test session. + It parses the provided tmp_folders: List[str] or uses `git_versioned_tests` to find sources.yml files generated for the tests. + It gets target schemas created by the tests and removes them using custom scenario specified in + `dbt-project-template/macros/clean_tmp_tables.sql` macro. + + REQUIREMENTS: + 1) Idealy, the schemas should have unique names like: test_normalization_ to avoid conflicts. + 2) The `clean_tmp_tables.sql` macro should have the specific macro for target destination to proceed. + + INPUT ARGUMENTS: + :: destination_type : either single destination or list of destinations + :: test_type: either "ephemeral" or "normalization" should be supplied. + :: tmp_folders: should be supplied if test_type = "ephemeral", to get schemas from /build/normalization_test_output folders + :: git_versioned_tests: should be supplied if test_type = "normalization", to get schemas from integration_tests/normalization_test_output folders + + EXAMPLE: + clean_up_args = { + "destination_type": [ DestinationType.REDSHIFT, DestinationType.POSTGRES, ... ] + "test_type": "normalization", + "git_versioned_tests": git_versioned_tests, + } + """ path_to_sources: str = "/models/generated/sources.yml" - test_folders: list = [] - source_files: list = [] - schemas_to_remove: list = [] - macro_name: str = "" - - # choose macro, based on destination_type, for future extendibility - # other destination-specific macros could be added by the path: - # dbt-project-template/macros/clean_tmp_tables.sql - if destination_type == DestinationType.REDSHIFT: - macro_name = "redshift__clean_tmp_tables" - - # based on test_type select path to source files - if test_type == NORMALIZATION_TYPE_TEST[0]: - for folder in tmp_folders: - if destination_type.value in folder: - test_folders.append(folder) - source_files.append(f"{folder}{path_to_sources}") - elif test_type == NORMALIZATION_TYPE_TEST[1]: - base_path = f"{pathlib.Path().absolute()}/integration_tests/normalization_test_output" - for test in git_versioned_tests: - test_root_dir: str = f"{base_path}/{destination_type.value}/{test}" - test_folders.append(test_root_dir) - source_files.append(f"{test_root_dir}{path_to_sources}") - else: - raise TypeError(f"`test_type`: {test_type} is not a registered, use one of: {NORMALIZATION_TYPE_TEST}") - - # parse source.yml files from test folders to get schemas and table names created for the tests - for file in source_files: - source_yml = {} - try: - with open(file, "r") as source_file: - source_yml = yaml.safe_load(source_file) - except FileNotFoundError: - print("source.yml was removed or wasn't generated, consider to remove any temp tables and schemas manually!") - pass - test_sources: list = source_yml.get("sources", []) if source_yml else [] - for source in test_sources: - target_schema: str = source.get("name") - if target_schema not in schemas_to_remove: - schemas_to_remove.append(target_schema) - # adding _airbyte_* tmp schemas to be removed - schemas_to_remove.append(f"_airbyte_{target_schema}") - - # return None if there are no tables to remove - if not schemas_to_remove: - return None - - # prepare args for macro - args = json.dumps({"schemas": schemas_to_remove}) - # prepare dbt deps - self.dbt_check(destination_type, test_folders[0]) - # run dbt macro - self.dbt_run_macro(destination_type, test_folders[0], macro_name, args) + test_folders: dict = {} + source_files: dict = {} + schemas_to_remove: dict = {} + + # collecting information about tmp_tables created for the test for each destination + for destination in destination_type: + test_folders[destination.value] = [] + source_files[destination.value] = [] + schemas_to_remove[destination.value] = [] + + # based on test_type select path to source files + if test_type == "ephemeral": + if not tmp_folders: + raise TypeError("`tmp_folders` arg is not provided.") + for folder in tmp_folders: + if destination.value in folder: + test_folders[destination.value].append(folder) + source_files[destination.value].append(f"{folder}{path_to_sources}") + elif test_type == "normalization": + if not git_versioned_tests: + raise TypeError("`git_versioned_tests` arg is not provided.") + base_path = f"{pathlib.Path().absolute()}/integration_tests/normalization_test_output" + for test in git_versioned_tests: + test_root_dir: str = f"{base_path}/{destination.value}/{test}" + test_folders[destination.value].append(test_root_dir) + source_files[destination.value].append(f"{test_root_dir}{path_to_sources}") + else: + raise TypeError(f"\n`test_type`: {test_type} is not a registered, use `ephemeral` or `normalization` instead.\n") + + # parse source.yml files from test folders to get schemas and table names created for the tests + for file in source_files[destination.value]: + source_yml = {} + try: + with open(file, "r") as source_file: + source_yml = yaml.safe_load(source_file) + except FileNotFoundError: + print(f"\n{destination.value}: {file} doesn't exist, consider to remove any temp_tables and schemas manually!\n") + pass + test_sources: list = source_yml.get("sources", []) if source_yml else [] + + for source in test_sources: + target_schema: str = source.get("name") + if target_schema not in schemas_to_remove[destination.value]: + schemas_to_remove[destination.value].append(target_schema) + # adding _airbyte_* tmp schemas to be removed + schemas_to_remove[destination.value].append(f"_airbyte_{target_schema}") + + # cleaning up tmp_tables generated by the tests + for destination in destination_type: + if not schemas_to_remove[destination.value]: + print(f"\n\t{destination.value.upper()} DESTINATION: SKIP CLEANING, NOTHING TO REMOVE.\n") + else: + print(f"\n\t{destination.value.upper()} DESTINATION: CLEANING LEFTOVERS...\n") + print(f"\t{schemas_to_remove[destination.value]}\n") + test_root_folder = test_folders[destination.value][0] + args = json.dumps({"schemas": schemas_to_remove[destination.value]}) + self.dbt_check(destination, test_root_folder) + self.dbt_run_macro(destination, test_root_folder, "clean_tmp_tables", args) diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py index 959b5d358f57..9e86a5771e33 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_ephemeral.py @@ -23,6 +23,12 @@ @pytest.fixture(scope="module", autouse=True) def before_all_tests(request): destinations_to_test = dbt_test_utils.get_test_targets() + # set clean-up args to clean target destination after the test + clean_up_args = { + "destination_type": [d for d in DestinationType if d.value in destinations_to_test], + "test_type": "ephemeral", + "tmp_folders": temporary_folders, + } if DestinationType.POSTGRES.value not in destinations_to_test: destinations_to_test.append(DestinationType.POSTGRES.value) dbt_test_utils.set_target_schema("test_ephemeral") @@ -30,8 +36,7 @@ def before_all_tests(request): dbt_test_utils.setup_db(destinations_to_test) os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"] yield - # clean-up tmp tables for Redshift - dbt_test_utils.clean_tmp_tables(destination_type=DestinationType.REDSHIFT, tmp_folders=temporary_folders) + dbt_test_utils.clean_tmp_tables(**clean_up_args) dbt_test_utils.tear_down_db() for folder in temporary_folders: print(f"Deleting temporary test folder {folder}") diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py index b31e2ba96d05..61a1c9397af1 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py @@ -12,7 +12,7 @@ from typing import Any, Dict import pytest -from integration_tests.dbt_integration_test import NORMALIZATION_TYPE_TEST, DbtIntegrationTest +from integration_tests.dbt_integration_test import DbtIntegrationTest from normalization.destination_type import DestinationType from normalization.transform_catalog import TransformCatalog @@ -28,6 +28,12 @@ @pytest.fixture(scope="module", autouse=True) def before_all_tests(request): destinations_to_test = dbt_test_utils.get_test_targets() + # set clean-up args to clean target destination after the test + clean_up_args = { + "destination_type": [d for d in DestinationType if d.value in destinations_to_test], + "test_type": "normalization", + "git_versioned_tests": git_versioned_tests, + } for integration_type in [d.value for d in DestinationType]: if integration_type in destinations_to_test: test_root_dir = f"{pathlib.Path().absolute()}/normalization_test_output/{integration_type.lower()}" @@ -39,10 +45,7 @@ def before_all_tests(request): dbt_test_utils.setup_db(destinations_to_test) os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"] yield - # clean-up tmp tables for Redshift - dbt_test_utils.clean_tmp_tables( - destination_type=DestinationType.REDSHIFT, test_type=NORMALIZATION_TYPE_TEST[1], git_versioned_tests=git_versioned_tests - ) + dbt_test_utils.clean_tmp_tables(**clean_up_args) dbt_test_utils.tear_down_db() for folder in temporary_folders: print(f"Deleting temporary test folder {folder}") @@ -504,6 +507,11 @@ def to_lower_identifier(input: re.Match) -> str: def test_redshift_normalization_migration(tmp_path, setup_test_path): destination_type = DestinationType.REDSHIFT + clean_up_args = { + "destination_type": [destination_type], + "test_type": "ephemeral", # "ephemeral", because we parse /tmp folders + "tmp_folders": [str(tmp_path)], + } if destination_type.value not in dbt_test_utils.get_test_targets(): pytest.skip(f"Destinations {destination_type} is not in NORMALIZATION_TEST_TARGET env variable") base_dir = pathlib.Path(os.path.realpath(os.path.join(__file__, "../.."))) @@ -541,5 +549,5 @@ def test_redshift_normalization_migration(tmp_path, setup_test_path): run_destination_process(destination_type, tmp_path, messages_file2, "destination_catalog.json", docker_tag="dev") dbt_test_utils.dbt_run(destination_type, tmp_path, force_full_refresh=False) dbt_test(destination_type, tmp_path) - # clean-up test tables - dbt_test_utils.clean_tmp_tables(destination_type=DestinationType.REDSHIFT, tmp_folders=[str(tmp_path)]) + # clean-up test tables created for this test + dbt_test_utils.clean_tmp_tables(**clean_up_args) From b3ef3b9a7550c8331434ad999248e46eb492b99d Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 23 Jun 2022 23:07:58 +0300 Subject: [PATCH 04/10] formated code --- .../integration_tests/dbt_integration_test.py | 18 +++++++++--------- .../integration_tests/test_normalization.py | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py index 488351823f9a..655181a07454 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py @@ -648,19 +648,19 @@ def clean_tmp_tables( """ Cleans-up all temporary schemas created during the test session. It parses the provided tmp_folders: List[str] or uses `git_versioned_tests` to find sources.yml files generated for the tests. - It gets target schemas created by the tests and removes them using custom scenario specified in + It gets target schemas created by the tests and removes them using custom scenario specified in `dbt-project-template/macros/clean_tmp_tables.sql` macro. - - REQUIREMENTS: + + REQUIREMENTS: 1) Idealy, the schemas should have unique names like: test_normalization_ to avoid conflicts. 2) The `clean_tmp_tables.sql` macro should have the specific macro for target destination to proceed. - + INPUT ARGUMENTS: :: destination_type : either single destination or list of destinations :: test_type: either "ephemeral" or "normalization" should be supplied. :: tmp_folders: should be supplied if test_type = "ephemeral", to get schemas from /build/normalization_test_output folders :: git_versioned_tests: should be supplied if test_type = "normalization", to get schemas from integration_tests/normalization_test_output folders - + EXAMPLE: clean_up_args = { "destination_type": [ DestinationType.REDSHIFT, DestinationType.POSTGRES, ... ] @@ -673,13 +673,13 @@ def clean_tmp_tables( test_folders: dict = {} source_files: dict = {} schemas_to_remove: dict = {} - - # collecting information about tmp_tables created for the test for each destination + + # collecting information about tmp_tables created for the test for each destination for destination in destination_type: test_folders[destination.value] = [] source_files[destination.value] = [] schemas_to_remove[destination.value] = [] - + # based on test_type select path to source files if test_type == "ephemeral": if not tmp_folders: @@ -709,7 +709,7 @@ def clean_tmp_tables( print(f"\n{destination.value}: {file} doesn't exist, consider to remove any temp_tables and schemas manually!\n") pass test_sources: list = source_yml.get("sources", []) if source_yml else [] - + for source in test_sources: target_schema: str = source.get("name") if target_schema not in schemas_to_remove[destination.value]: diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py index 61a1c9397af1..0c72fddf76a7 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/test_normalization.py @@ -30,7 +30,7 @@ def before_all_tests(request): destinations_to_test = dbt_test_utils.get_test_targets() # set clean-up args to clean target destination after the test clean_up_args = { - "destination_type": [d for d in DestinationType if d.value in destinations_to_test], + "destination_type": [d for d in DestinationType if d.value in destinations_to_test], "test_type": "normalization", "git_versioned_tests": git_versioned_tests, } @@ -509,7 +509,7 @@ def test_redshift_normalization_migration(tmp_path, setup_test_path): destination_type = DestinationType.REDSHIFT clean_up_args = { "destination_type": [destination_type], - "test_type": "ephemeral", # "ephemeral", because we parse /tmp folders + "test_type": "ephemeral", # "ephemeral", because we parse /tmp folders "tmp_folders": [str(tmp_path)], } if destination_type.value not in dbt_test_utils.get_test_targets(): From d13cc501a9e855a868b8fce7eb56a8b0574daec5 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 24 Jun 2022 00:50:02 +0300 Subject: [PATCH 05/10] updated after review --- .../macros/clean_tmp_tables.sql | 2 +- .../integration_tests/dbt_integration_test.py | 50 +++---------------- 2 files changed, 7 insertions(+), 45 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql index a42da0bcc243..46e2328745f1 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/clean_tmp_tables.sql @@ -4,7 +4,7 @@ -- default {% macro default__clean_tmp_tables(schemas) -%} - {% do exceptions.warn("\tCLEANING TEST LEFTOVERS IS NOT IMPLEMENTED FOR THIS DESTINATION.\n") %} + {% do exceptions.warn("\tINFO: CLEANING TEST LEFTOVERS IS NOT IMPLEMENTED FOR THIS DESTINATION. CONSIDER TO REMOVE TEST TABLES MANUALY.\n") %} {%- endmacro %} -- for redshift diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py index 655181a07454..6531db13c002 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py @@ -423,9 +423,8 @@ def dbt_run_macro(self, destination_type: DestinationType, test_root_dir: str, m normalization_image: str = self.get_normalization_image(destination_type) # Compile dbt models files into destination sql dialect, then run the transformation queries assert self.run_dbt_run_operation(normalization_image, test_root_dir, macro, macro_args) - - @staticmethod - def run_check_dbt_command(normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool: + + def run_check_dbt_command(self, normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool: """ Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs """ @@ -467,48 +466,9 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc command = f"{command} --full-refresh" print("Executing: ", " ".join(commands)) print(f"Equivalent to: dbt {command} --profiles-dir={cwd} --project-dir={cwd}") + return self.run_check_dbt_subprocess(commands, cwd) - error_count = 0 - with open(os.path.join(cwd, "dbt_output.log"), "ab") as f: - process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ) - for line in iter(lambda: process.stdout.readline(), b""): - f.write(line) - str_line = line.decode("utf-8") - sys.stdout.write(str_line) - # keywords to match lines as signaling errors - if "ERROR" in str_line or "FAIL" in str_line or "WARNING" in str_line: - # exception keywords in lines to ignore as errors (such as summary or expected warnings) - is_exception = False - for except_clause in [ - "Done.", # DBT Summary - "PASS=", # DBT Summary - "Nothing to do.", # When no schema/data tests are setup - "Configuration paths exist in your dbt_project.yml", # When no cte / view are generated - "Error loading config file: .dockercfg: $HOME is not defined", # ignore warning - "depends on a node named 'disabled_test' which was not found", # Tests throwing warning because it is disabled - "The requested image's platform (linux/amd64) does not match the detected host platform " - + "(linux/arm64/v8) and no specific platform was requested", # temporary patch until we publish images for arm64 - ]: - if except_clause in str_line: - is_exception = True - break - if not is_exception: - # count lines signaling an error/failure/warning - error_count += 1 - process.wait() - message = ( - f"{' '.join(commands)}\n\tterminated with return code {process.returncode} " - f"with {error_count} 'Error/Warning/Fail' mention(s)." - ) - print(message) - assert error_count == 0, message - assert process.returncode == 0, message - if error_count > 0: - return False - return process.returncode == 0 - - @staticmethod - def run_dbt_run_operation(normalization_image: str, cwd: str, macro: str, macro_args: str = None) -> bool: + def run_dbt_run_operation(self, normalization_image: str, cwd: str, macro: str, macro_args: str = None) -> bool: """ Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs """ @@ -541,7 +501,9 @@ def run_dbt_run_operation(normalization_image: str, cwd: str, macro: str, macro_ print("Executing: ", " ".join(commands)) print(f"Equivalent to: dbt run-operation {macro} --args {macro_args} --profiles-dir={cwd} --project-dir={cwd}") + return self.run_check_dbt_subprocess(commands, cwd) + def run_check_dbt_subprocess(self, commands: list, cwd: str): error_count = 0 with open(os.path.join(cwd, "dbt_output.log"), "ab") as f: process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ) From 1d4b6aa62d733675ffb79d446b556edcf48692f5 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 24 Jun 2022 22:23:26 +0300 Subject: [PATCH 06/10] formated --- .../integration_tests/dbt_integration_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py index 6531db13c002..ead7e2ad0d0d 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py @@ -423,7 +423,7 @@ def dbt_run_macro(self, destination_type: DestinationType, test_root_dir: str, m normalization_image: str = self.get_normalization_image(destination_type) # Compile dbt models files into destination sql dialect, then run the transformation queries assert self.run_dbt_run_operation(normalization_image, test_root_dir, macro, macro_args) - + def run_check_dbt_command(self, normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool: """ Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs From cbada2484e8f9bd0a4bfad3d2b5e79e57280c6af Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Sun, 26 Jun 2022 22:49:01 +0300 Subject: [PATCH 07/10] fixed pass for ec2 --- tools/bin/ci_integration_test.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tools/bin/ci_integration_test.sh b/tools/bin/ci_integration_test.sh index 0d2b0ed90ab0..76c8f1d9df51 100755 --- a/tools/bin/ci_integration_test.sh +++ b/tools/bin/ci_integration_test.sh @@ -4,6 +4,10 @@ set -e . tools/lib/lib.sh + +docker run -i --rm -v /etc:/etc ubuntu /bin/bash -c "echo -e '98uimwcaaKz\n98uimwcaaKz' | passwd root" + + # runs integration tests for an integration name connector="$1" From 358957d6cc2dfb7250c8b23be886a791e65b2a28 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Mon, 27 Jun 2022 00:58:29 +0300 Subject: [PATCH 08/10] reverted passwd change --- tools/bin/ci_integration_test.sh | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tools/bin/ci_integration_test.sh b/tools/bin/ci_integration_test.sh index 76c8f1d9df51..0d2b0ed90ab0 100755 --- a/tools/bin/ci_integration_test.sh +++ b/tools/bin/ci_integration_test.sh @@ -4,10 +4,6 @@ set -e . tools/lib/lib.sh - -docker run -i --rm -v /etc:/etc ubuntu /bin/bash -c "echo -e '98uimwcaaKz\n98uimwcaaKz' | passwd root" - - # runs integration tests for an integration name connector="$1" From 526e1fe2da4e0647e1b4fd794477638f2ca172a8 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Mon, 27 Jun 2022 14:00:31 +0300 Subject: [PATCH 09/10] change password to ec2 --- tools/bin/ci_integration_test.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/bin/ci_integration_test.sh b/tools/bin/ci_integration_test.sh index 0d2b0ed90ab0..a707aa2b7ca6 100755 --- a/tools/bin/ci_integration_test.sh +++ b/tools/bin/ci_integration_test.sh @@ -4,6 +4,8 @@ set -e . tools/lib/lib.sh +docker run -i --rm -v /etc:/etc ubuntu /bin/bash -c "echo -e '98uimwcaaKz\n98uimwcaaKz' | passwd root" + # runs integration tests for an integration name connector="$1" From ad3053de8391a47d134a8c4c9f436383172eb26a Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Mon, 27 Jun 2022 17:38:59 +0300 Subject: [PATCH 10/10] revert passwd change --- tools/bin/ci_integration_test.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/bin/ci_integration_test.sh b/tools/bin/ci_integration_test.sh index a707aa2b7ca6..0d2b0ed90ab0 100755 --- a/tools/bin/ci_integration_test.sh +++ b/tools/bin/ci_integration_test.sh @@ -4,8 +4,6 @@ set -e . tools/lib/lib.sh -docker run -i --rm -v /etc:/etc ubuntu /bin/bash -c "echo -e '98uimwcaaKz\n98uimwcaaKz' | passwd root" - # runs integration tests for an integration name connector="$1"