Skip to content

Commit

Permalink
alter the tests so we can run rpc tests on snowflake
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Jul 27, 2020
1 parent 92a493c commit 3611f2d
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 36 deletions.
92 changes: 87 additions & 5 deletions test/rpc/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,48 @@
import pytest
import random
import time
from typing import Dict, Any
from typing import Dict, Any, Set

import yaml


def pytest_addoption(parser):
parser.addoption(
'--profile', default='postgres', help='Use the postgres profile',
)


def _get_item_profiles(item) -> Set[str]:
supported = set()
for mark in item.iter_markers(name='supported'):
supported.update(mark.args)
return supported


def pytest_collection_modifyitems(config, items):
selected_profile = config.getoption('profile')

to_remove = []

for item in items:
item_profiles = _get_item_profiles(item)
if selected_profile not in item_profiles and 'any' not in item_profiles:
to_remove.append(item)

for item in to_remove:
items.remove(item)


def pytest_configure(config):
config.addinivalue_line('markers', 'supported(: Marks postgres-only tests')
config.addinivalue_line(
'markers', 'snowflake: Mark snowflake-only tests'
)
config.addinivalue_line(
'markers', 'any: Mark '
)


@pytest.fixture
def unique_schema() -> str:
return "test{}{:04}".format(int(time.time()), random.randint(0, 9999))
Expand All @@ -22,7 +59,6 @@ def project_root(tmpdir):
return tmpdir.mkdir('project')


@pytest.fixture
def postgres_profile_data(unique_schema):
return {
'config': {
Expand All @@ -46,9 +82,55 @@ def postgres_profile_data(unique_schema):
}


def snowflake_profile_data(unique_schema):
return {
'config': {
'send_anonymous_usage_stats': False
},
'test': {
'outputs': {
'default': {
'type': 'snowflake',
'threads': 4,
'account': os.getenv('SNOWFLAKE_TEST_ACCOUNT'),
'user': os.getenv('SNOWFLAKE_TEST_USER'),
'password': os.getenv('SNOWFLAKE_TEST_PASSWORD'),
'database': os.getenv('SNOWFLAKE_TEST_DATABASE'),
'schema': unique_schema,
'warehouse': os.getenv('SNOWFLAKE_TEST_WAREHOUSE'),
},
'keepalives': {
'type': 'snowflake',
'threads': 4,
'account': os.getenv('SNOWFLAKE_TEST_ACCOUNT'),
'user': os.getenv('SNOWFLAKE_TEST_USER'),
'password': os.getenv('SNOWFLAKE_TEST_PASSWORD'),
'database': os.getenv('SNOWFLAKE_TEST_DATABASE'),
'schema': unique_schema,
'warehouse': os.getenv('SNOWFLAKE_TEST_WAREHOUSE'),
'client_session_keep_alive': True,
},
},
'target': 'default',
},
}


@pytest.fixture
def dbt_profile_data(unique_schema, pytestconfig):
profile_name = pytestconfig.getoption('profile')
if profile_name == 'postgres':
return postgres_profile_data(unique_schema)
elif profile_name == 'snowflake':
return snowflake_profile_data(unique_schema)
else:
print(f'Bad profile name {profile_name}!')
return {}


@pytest.fixture
def postgres_profile(profiles_root, postgres_profile_data) -> Dict[str, Any]:
def dbt_profile(profiles_root, dbt_profile_data) -> Dict[str, Any]:
path = os.path.join(profiles_root, 'profiles.yml')
with open(path, 'w') as fp:
fp.write(yaml.safe_dump(postgres_profile_data))
return postgres_profile_data
fp.write(yaml.safe_dump(dbt_profile_data))
return dbt_profile_data
4 changes: 3 additions & 1 deletion test/rpc/test_compile.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import pytest
from .util import (
assert_has_threads,
get_querier,
ProjectDefinition,
)


@pytest.mark.supported('postgres')
def test_rpc_compile_threads(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'}
Expand Down
4 changes: 3 additions & 1 deletion test/rpc/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
import pytest

from .util import (
get_querier,
Expand All @@ -15,8 +16,9 @@ def _compile_poll_for_result(querier, id: int):
assert compile_sql_result['results'][0]['compiled_sql'] == sql


@pytest.mark.supported('postgres')
def test_rpc_compile_sql_concurrency(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'}
Expand Down
8 changes: 6 additions & 2 deletions test/rpc/test_deps.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pytest

from .util import (
get_querier,
ProjectDefinition,
Expand Down Expand Up @@ -70,7 +72,8 @@ def deps_with_packages(packages, bad_packages, project_dir, profiles_dir, schema
querier.is_result(querier.async_wait(tok1))


def test_rpc_deps_packages(project_root, profiles_root, postgres_profile, unique_schema):
@pytest.mark.supported('postgres')
def test_rpc_deps_packages(project_root, profiles_root, dbt_profile, unique_schema):
packages = [{
'package': 'fishtown-analytics/dbt_utils',
'version': '0.2.1',
Expand All @@ -82,7 +85,8 @@ def test_rpc_deps_packages(project_root, profiles_root, postgres_profile, unique
deps_with_packages(packages, bad_packages, project_root, profiles_root, unique_schema)


def test_rpc_deps_git(project_root, profiles_root, postgres_profile, unique_schema):
@pytest.mark.supported('postgres')
def test_rpc_deps_git(project_root, profiles_root, dbt_profile, unique_schema):
packages = [{
'git': 'https://github.com/fishtown-analytics/dbt-utils.git',
'revision': '0.2.1'
Expand Down
22 changes: 15 additions & 7 deletions test/rpc/test_management.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import pytest
import time
from .util import (
get_querier,
ProjectDefinition,
)


@pytest.mark.supported('postgres')
def test_rpc_basics(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'}
Expand Down Expand Up @@ -62,7 +64,8 @@ def test_rpc_basics(
'''


def test_rpc_status_error(project_root, profiles_root, postgres_profile, unique_schema):
@pytest.mark.supported('postgres')
def test_rpc_status_error(project_root, profiles_root, dbt_profile, unique_schema):
project = ProjectDefinition(
models={
'descendant_model.sql': 'select * from {{ source("test_source", "test_table") }}',
Expand Down Expand Up @@ -130,7 +133,8 @@ def test_rpc_status_error(project_root, profiles_root, postgres_profile, unique_
querier.is_result(querier.compile_sql('select 1 as id'))


def test_gc_change_interval(project_root, profiles_root, postgres_profile, unique_schema):
@pytest.mark.supported('postgres')
def test_gc_change_interval(project_root, profiles_root, dbt_profile, unique_schema):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'}
)
Expand Down Expand Up @@ -176,7 +180,8 @@ def test_gc_change_interval(project_root, profiles_root, postgres_profile, uniqu
assert len(result['rows']) == 2


def test_ps_poll_output_match(project_root, profiles_root, postgres_profile, unique_schema):
@pytest.mark.supported('postgres')
def test_ps_poll_output_match(project_root, profiles_root, dbt_profile, unique_schema):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'}
)
Expand Down Expand Up @@ -248,8 +253,9 @@ def wait_for_log_ordering(querier, token, attempts, *messages) -> int:
assert False, msg


@pytest.mark.supported('postgres')
def test_get_status(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'},
Expand Down Expand Up @@ -294,8 +300,9 @@ def test_get_status(
assert len(result['logs']) == 0


@pytest.mark.supported('postgres')
def test_missing_tag_sighup(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={
Expand Down Expand Up @@ -333,8 +340,9 @@ def test_missing_tag_sighup(
assert querier.wait_for_status('ready') is True


@pytest.mark.supported('postgres')
def test_get_manifest(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={
Expand Down
10 changes: 7 additions & 3 deletions test/rpc/test_run.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import pytest
from .util import (
assert_has_threads,
get_querier,
ProjectDefinition,
)


@pytest.mark.supported('postgres')
def test_rpc_run_threads(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'}
Expand All @@ -28,8 +30,9 @@ def test_rpc_run_threads(
assert_has_threads(results, 7)


@pytest.mark.supported('postgres')
def test_rpc_run_vars(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={
Expand All @@ -50,8 +53,9 @@ def test_rpc_run_vars(
assert results['results'][0]['node']['compiled_sql'] == 'select 100 as id'


@pytest.mark.supported('postgres')
def test_rpc_run_vars_compiled(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={
Expand Down
8 changes: 6 additions & 2 deletions test/rpc/test_run_operation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pytest

from .util import (
get_querier,
ProjectDefinition,
Expand All @@ -16,8 +18,9 @@
'''


@pytest.mark.supported('postgres')
def test_run_operation(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'},
Expand Down Expand Up @@ -62,8 +65,9 @@ def test_run_operation(
assert poll_result['success'] is True


@pytest.mark.supported('postgres')
def test_run_operation_cli(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'},
Expand Down
43 changes: 43 additions & 0 deletions test/rpc/test_run_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest

from .util import (
get_querier,
ProjectDefinition,
)


@pytest.mark.supported('any')
def test_rpc_run_sql_nohang(
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'}
)
querier_ctx = get_querier(
project_def=project,
project_dir=project_root,
profiles_dir=profiles_root,
schema=unique_schema,
test_kwargs={},
)
with querier_ctx as querier:
querier.async_wait_for_result(querier.run_sql('select 1 as id'))


@pytest.mark.supported('snowflake')
def test_snowflake_rpc_run_sql_keepalive_nohang(
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
models={'my_model.sql': 'select 1 as id'}
)
querier_ctx = get_querier(
project_def=project,
project_dir=project_root,
profiles_dir=profiles_root,
schema=unique_schema,
test_kwargs={},
target='keepalives',
)
with querier_ctx as querier:
querier.async_wait_for_result(querier.run_sql('select 1 as id'))
7 changes: 5 additions & 2 deletions test/rpc/test_seed.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import pytest
from .util import (
assert_has_threads,
get_querier,
ProjectDefinition,
)


@pytest.mark.supported('postgres')
def test_rpc_seed_threads(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
project_data={'seeds': {'config': {'quote_columns': False}}},
Expand All @@ -30,8 +32,9 @@ def test_rpc_seed_threads(
assert_has_threads(results, 7)


@pytest.mark.supported('postgres')
def test_rpc_seed_include_exclude(
project_root, profiles_root, postgres_profile, unique_schema
project_root, profiles_root, dbt_profile, unique_schema
):
project = ProjectDefinition(
project_data={'seeds': {'config': {'quote_columns': False}}},
Expand Down
Loading

0 comments on commit 3611f2d

Please sign in to comment.