Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Base Norrmalization: clean-up Redshift tmp_schemas after SAT #14015

Merged
merged 15 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% macro clean_tmp_tables(schemas) -%}
{{ adapter.dispatch('clean_tmp_tables')(schemas) }}
{%- endmacro %}

-- default
{% macro default__clean_tmp_tables(schemas) -%}
{% do exceptions.warn("\tCLEANING TEST LEFTOVERS IS NOT IMPLEMENTED FOR THIS DESTINATION.\n") %}
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
{%- endmacro %}

-- for redshift
{% macro redshift__clean_tmp_tables(schemas) %}
{%- for tmp_schema in schemas -%}
{% do log("\tDROP SCHEMA IF EXISTS " ~ tmp_schema, info=True) %}
{%- set drop_query -%}
drop schema if exists {{ tmp_schema }} cascade;
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
{%- endset -%}
{%- do run_query(drop_query) -%}
{%- endfor -%}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import json
import os
import pathlib
import random
import re
import socket
Expand All @@ -14,8 +15,9 @@
import threading
import time
from copy import copy
from typing import Any, Callable, Dict, List
from typing import Any, Callable, Dict, List, Union

import yaml
from normalization.destination_type import DestinationType
from normalization.transform_catalog.transform import read_yaml_config, write_yaml_config
from normalization.transform_config.transform import TransformConfig
Expand Down Expand Up @@ -414,6 +416,14 @@ def dbt_run(self, destination_type: DestinationType, test_root_dir: str, force_f
# Compile dbt models files into destination sql dialect, then run the transformation queries
assert self.run_check_dbt_command(normalization_image, "run", test_root_dir, force_full_refresh)

def dbt_run_macro(self, destination_type: DestinationType, test_root_dir: str, macro: str, macro_args: str = None):
"""
Run the dbt CLI to perform transformations on the test raw data in the destination, using independent macro.
"""
normalization_image: str = self.get_normalization_image(destination_type)
# Compile dbt models files into destination sql dialect, then run the transformation queries
assert self.run_dbt_run_operation(normalization_image, test_root_dir, macro, macro_args)

@staticmethod
def run_check_dbt_command(normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool:
"""
Expand All @@ -424,7 +434,6 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc
else:
dbtAdditionalArgs = ["--event-buffer-size=10000"]

error_count = 0
commands = (
[
"docker",
Expand Down Expand Up @@ -458,6 +467,82 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc
command = f"{command} --full-refresh"
print("Executing: ", " ".join(commands))
print(f"Equivalent to: dbt {command} --profiles-dir={cwd} --project-dir={cwd}")

error_count = 0
with open(os.path.join(cwd, "dbt_output.log"), "ab") as f:
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ)
for line in iter(lambda: process.stdout.readline(), b""):
f.write(line)
str_line = line.decode("utf-8")
sys.stdout.write(str_line)
# keywords to match lines as signaling errors
if "ERROR" in str_line or "FAIL" in str_line or "WARNING" in str_line:
# exception keywords in lines to ignore as errors (such as summary or expected warnings)
is_exception = False
for except_clause in [
"Done.", # DBT Summary
"PASS=", # DBT Summary
"Nothing to do.", # When no schema/data tests are setup
"Configuration paths exist in your dbt_project.yml", # When no cte / view are generated
"Error loading config file: .dockercfg: $HOME is not defined", # ignore warning
"depends on a node named 'disabled_test' which was not found", # Tests throwing warning because it is disabled
"The requested image's platform (linux/amd64) does not match the detected host platform "
+ "(linux/arm64/v8) and no specific platform was requested", # temporary patch until we publish images for arm64
]:
if except_clause in str_line:
is_exception = True
break
if not is_exception:
# count lines signaling an error/failure/warning
error_count += 1
process.wait()
message = (
f"{' '.join(commands)}\n\tterminated with return code {process.returncode} "
f"with {error_count} 'Error/Warning/Fail' mention(s)."
)
print(message)
assert error_count == 0, message
assert process.returncode == 0, message
if error_count > 0:
return False
return process.returncode == 0

@staticmethod
def run_dbt_run_operation(normalization_image: str, cwd: str, macro: str, macro_args: str = None) -> bool:
"""
Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs
"""
args = ["--args", macro_args] if macro_args else []
commands = (
[
"docker",
"run",
"--rm",
"--init",
"-v",
f"{cwd}:/workspace",
"-v",
f"{cwd}/build:/build",
"-v",
f"{cwd}/logs:/logs",
"-v",
f"{cwd}/build/dbt_packages:/dbt",
"--network",
"host",
"--entrypoint",
"/usr/local/bin/dbt",
"-i",
normalization_image,
]
+ ["run-operation", macro]
+ args
+ ["--profiles-dir=/workspace", "--project-dir=/workspace"]
)

print("Executing: ", " ".join(commands))
print(f"Equivalent to: dbt run-operation {macro} --args {macro_args} --profiles-dir={cwd} --project-dir={cwd}")

error_count = 0
with open(os.path.join(cwd, "dbt_output.log"), "ab") as f:
process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ)
for line in iter(lambda: process.stdout.readline(), b""):
Expand Down Expand Up @@ -552,3 +637,94 @@ def update_yaml_file(filename: str, callback: Callable):
updated, config = callback(config)
if updated:
write_yaml_config(config, filename)

def clean_tmp_tables(
self,
destination_type: Union[DestinationType, List[DestinationType]],
test_type: str,
tmp_folders: list = None,
git_versioned_tests: list = None,
):
"""
Cleans-up all temporary schemas created during the test session.
It parses the provided tmp_folders: List[str] or uses `git_versioned_tests` to find sources.yml files generated for the tests.
It gets target schemas created by the tests and removes them using custom scenario specified in
`dbt-project-template/macros/clean_tmp_tables.sql` macro.

REQUIREMENTS:
1) Idealy, the schemas should have unique names like: test_normalization_<some_random_string> to avoid conflicts.
2) The `clean_tmp_tables.sql` macro should have the specific macro for target destination to proceed.

INPUT ARGUMENTS:
:: destination_type : either single destination or list of destinations
:: test_type: either "ephemeral" or "normalization" should be supplied.
:: tmp_folders: should be supplied if test_type = "ephemeral", to get schemas from /build/normalization_test_output folders
:: git_versioned_tests: should be supplied if test_type = "normalization", to get schemas from integration_tests/normalization_test_output folders

EXAMPLE:
clean_up_args = {
"destination_type": [ DestinationType.REDSHIFT, DestinationType.POSTGRES, ... ]
"test_type": "normalization",
"git_versioned_tests": git_versioned_tests,
}
"""

path_to_sources: str = "/models/generated/sources.yml"
test_folders: dict = {}
source_files: dict = {}
schemas_to_remove: dict = {}

# collecting information about tmp_tables created for the test for each destination
for destination in destination_type:
test_folders[destination.value] = []
source_files[destination.value] = []
schemas_to_remove[destination.value] = []

# based on test_type select path to source files
if test_type == "ephemeral":
if not tmp_folders:
raise TypeError("`tmp_folders` arg is not provided.")
for folder in tmp_folders:
if destination.value in folder:
test_folders[destination.value].append(folder)
source_files[destination.value].append(f"{folder}{path_to_sources}")
elif test_type == "normalization":
if not git_versioned_tests:
raise TypeError("`git_versioned_tests` arg is not provided.")
base_path = f"{pathlib.Path().absolute()}/integration_tests/normalization_test_output"
for test in git_versioned_tests:
test_root_dir: str = f"{base_path}/{destination.value}/{test}"
test_folders[destination.value].append(test_root_dir)
source_files[destination.value].append(f"{test_root_dir}{path_to_sources}")
else:
raise TypeError(f"\n`test_type`: {test_type} is not a registered, use `ephemeral` or `normalization` instead.\n")

# parse source.yml files from test folders to get schemas and table names created for the tests
for file in source_files[destination.value]:
source_yml = {}
try:
with open(file, "r") as source_file:
source_yml = yaml.safe_load(source_file)
except FileNotFoundError:
print(f"\n{destination.value}: {file} doesn't exist, consider to remove any temp_tables and schemas manually!\n")
pass
test_sources: list = source_yml.get("sources", []) if source_yml else []

for source in test_sources:
target_schema: str = source.get("name")
if target_schema not in schemas_to_remove[destination.value]:
schemas_to_remove[destination.value].append(target_schema)
# adding _airbyte_* tmp schemas to be removed
schemas_to_remove[destination.value].append(f"_airbyte_{target_schema}")

# cleaning up tmp_tables generated by the tests
for destination in destination_type:
if not schemas_to_remove[destination.value]:
print(f"\n\t{destination.value.upper()} DESTINATION: SKIP CLEANING, NOTHING TO REMOVE.\n")
else:
print(f"\n\t{destination.value.upper()} DESTINATION: CLEANING LEFTOVERS...\n")
print(f"\t{schemas_to_remove[destination.value]}\n")
test_root_folder = test_folders[destination.value][0]
args = json.dumps({"schemas": schemas_to_remove[destination.value]})
self.dbt_check(destination, test_root_folder)
self.dbt_run_macro(destination, test_root_folder, "clean_tmp_tables", args)
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@
@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
destinations_to_test = dbt_test_utils.get_test_targets()
# set clean-up args to clean target destination after the test
clean_up_args = {
"destination_type": [d for d in DestinationType if d.value in destinations_to_test],
"test_type": "ephemeral",
"tmp_folders": temporary_folders,
}
if DestinationType.POSTGRES.value not in destinations_to_test:
destinations_to_test.append(DestinationType.POSTGRES.value)
dbt_test_utils.set_target_schema("test_ephemeral")
dbt_test_utils.change_current_test_dir(request)
dbt_test_utils.setup_db(destinations_to_test)
os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"]
yield
dbt_test_utils.clean_tmp_tables(**clean_up_args)
dbt_test_utils.tear_down_db()
for folder in temporary_folders:
print(f"Deleting temporary test folder {folder}")
Expand Down Expand Up @@ -91,6 +98,9 @@ def run_test(destination_type: DestinationType, column_count: int, expected_exce
if destination_type.value == DestinationType.ORACLE.value:
# Oracle does not allow changing to random schema
dbt_test_utils.set_target_schema("test_normalization")
elif destination_type.value == DestinationType.REDSHIFT.value:
# set unique schema for Redshift test
dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_ephemeral_"))
else:
dbt_test_utils.set_target_schema("test_ephemeral")
print("Testing ephemeral")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
destinations_to_test = dbt_test_utils.get_test_targets()
# set clean-up args to clean target destination after the test
clean_up_args = {
"destination_type": [d for d in DestinationType if d.value in destinations_to_test],
"test_type": "normalization",
"git_versioned_tests": git_versioned_tests,
}
for integration_type in [d.value for d in DestinationType]:
if integration_type in destinations_to_test:
test_root_dir = f"{pathlib.Path().absolute()}/normalization_test_output/{integration_type.lower()}"
Expand All @@ -39,11 +45,11 @@ def before_all_tests(request):
dbt_test_utils.setup_db(destinations_to_test)
os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"]
yield
dbt_test_utils.clean_tmp_tables(**clean_up_args)
dbt_test_utils.tear_down_db()
for folder in temporary_folders:
print(f"Deleting temporary test folder {folder}")
shutil.rmtree(folder, ignore_errors=True)
# TODO delete target_schema in destination by copying dbt_project.yml and injecting a on-run-end hook to clean up


@pytest.fixture
Expand Down Expand Up @@ -78,6 +84,9 @@ def test_normalization(destination_type: DestinationType, test_resource_name: st
if destination_type.value == DestinationType.ORACLE.value:
# Oracle does not allow changing to random schema
dbt_test_utils.set_target_schema("test_normalization")
elif destination_type.value == DestinationType.REDSHIFT.value:
# set unique schema for Redshift test
dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_normalization_"))
try:
run_test_normalization(destination_type, test_resource_name)
finally:
Expand Down Expand Up @@ -498,6 +507,11 @@ def to_lower_identifier(input: re.Match) -> str:

def test_redshift_normalization_migration(tmp_path, setup_test_path):
destination_type = DestinationType.REDSHIFT
clean_up_args = {
"destination_type": [destination_type],
"test_type": "ephemeral", # "ephemeral", because we parse /tmp folders
"tmp_folders": [str(tmp_path)],
}
if destination_type.value not in dbt_test_utils.get_test_targets():
pytest.skip(f"Destinations {destination_type} is not in NORMALIZATION_TEST_TARGET env variable")
base_dir = pathlib.Path(os.path.realpath(os.path.join(__file__, "../..")))
Expand Down Expand Up @@ -535,3 +549,5 @@ def test_redshift_normalization_migration(tmp_path, setup_test_path):
run_destination_process(destination_type, tmp_path, messages_file2, "destination_catalog.json", docker_tag="dev")
dbt_test_utils.dbt_run(destination_type, tmp_path, force_full_refresh=False)
dbt_test(destination_type, tmp_path)
# clean-up test tables created for this test
dbt_test_utils.clean_tmp_tables(**clean_up_args)