Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated create table logic for using proper dbt render #550

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbt/include/sqlserver/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
{{ get_create_view_as_sql(tmp_relation, sql) }}

{%- set table_name -%}
{{ relation.database}}.{{ relation.schema }}.{{ relation.identifier }}
{{ relation }}
{%- endset -%}


{%- set contract_config = config.get('contract') -%}
{%- set query -%}
{% if contract_config.enforced and (not temporary) %}
Expand Down
116 changes: 116 additions & 0 deletions tests/functional/adapter/mssql/test_db_non_standard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import pytest
from dbt.tests.util import get_connection, run_dbt

database_name = "my-data-base"
schema_name = "mysource"
source_table_name = "my_table"

sources_yml = f"""
version: 2

sources:
- name: mysource
database: {database_name}
tables:
- name: my_table

"""

model_sql = """
{{ config(database="my-data-base", schema="mysource", materialized="table") }}
SELECT
*
FROM
{{ source('mysource', 'my_table') }}
"""


class TestNonStandardDB:
def create_db(self, project):
create_sql = """
DECLARE @col NVARCHAR(256)
SET @col = (SELECT CONVERT (varchar(256), SERVERPROPERTY('collation')));

IF NOT EXISTS (SELECT * FROM sys.databases WHERE name='{database}')
BEGIN
EXEC ('CREATE DATABASE [{database}] COLLATE ' + @col)
END
"""

with get_connection(project.adapter):
project.adapter.execute(
create_sql.format(database=database_name),
fetch=True,
)

def create_source_schema(self, project):
create_sql = """
USE [{database}];

IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')
BEGIN
EXEC('CREATE SCHEMA {schema}')
END
"""
with get_connection(project.adapter):
project.adapter.execute(
create_sql.format(database=database_name, schema=schema_name),
fetch=True,
)

def create_primary_table(self, project):
src_query = """
SELECT *
INTO
[{database}].{schema}.{table}
FROM
(
SELECT
1 as id,
CAST('2024-01-01' as DATETIME2(6)) updated_at

UNION ALL

SELECT
2 as id,
CAST('2024-01-01' as DATETIME2(6)) updated_at

UNION ALL

SELECT
3 as id,
CAST('2024-01-01' as DATETIME2(6)) updated_at
) as src_data
"""
with get_connection(project.adapter):
project.adapter.execute(
src_query.format(
database=database_name, schema=schema_name, table=source_table_name
),
fetch=True,
)

def cleanup_primary_table(self, project):
drop_sql = "DROP TABLE IF EXISTS [{database}].{schema}.{table}"
with get_connection(project.adapter):
project.adapter.execute(
drop_sql.format(
database=database_name, schema=schema_name, table=source_table_name
),
fetch=True,
)

@pytest.fixture(scope="class")
def models(self):
return {"model.sql": model_sql, "sources.yml": sources_yml}

def test_non_standard_database(self, project):
self.create_db(project)

self.cleanup_primary_table(project)
self.create_source_schema(project)
self.create_primary_table(project)

run_dbt()

self.cleanup_primary_table(project)