Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed logic for snapshot and added tests #537

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

{% set temp_snapshot_relation_sql = model['compiled_code'].replace("'", "''") %}
{% call statement('create temp_snapshot_relation') %}
USE [{{ model.database}}];
EXEC('DROP VIEW IF EXISTS {{ temp_snapshot_relation.include(database=False) }};');
EXEC('create view {{ temp_snapshot_relation.include(database=False) }} as {{ temp_snapshot_relation_sql }};');
{% endcall %}
Expand Down
1 change: 1 addition & 0 deletions dbt/include/sqlserver/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{%- set tmp_relation = relation.incorporate(path={"identifier": relation.identifier ~ '__dbt_tmp_vw'}, type='view') -%}

{%- do adapter.drop_relation(tmp_relation) -%}
USE [{{ relation.database }}];
{{ get_create_view_as_sql(tmp_relation, sql) }}

{%- set table_name -%}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/sqlserver/macros/relations/views/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
{% set tst %}
SELECT '1' as col
{% endset %}

USE [{{ relation.database }}];
EXEC('{{- escape_single_quotes(query) -}}')

{% endmacro %}
170 changes: 170 additions & 0 deletions tests/functional/adapter/mssql/test_cross_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import pytest
from dbt.tests.util import get_connection, run_dbt

snapshot_sql = """
{% snapshot claims_snapshot %}

{{
config(
target_database='secondary_db',
target_schema='dbo',
unique_key='id',

strategy='timestamp',
updated_at='updated_at',
)
}}

select * from {{source('mysource', 'claims')}}

{% endsnapshot %}
"""

source_csv = """id,updated_date
1,2024-01-01
2,2024-01-01
3,2024-01-01
"""

sources_yml = """
version: 2
sources:
- name: mysource
database: TestDB
tables:
- name: claims
"""


class TestCrossDB:
def create_secondary_db(self, project):
create_sql = """
DECLARE @col NVARCHAR(256)
SET @col = (SELECT CONVERT (varchar(256), SERVERPROPERTY('collation')));

IF NOT EXISTS (SELECT * FROM sys.databases WHERE name='secondary_db')
BEGIN
EXEC ('CREATE DATABASE secondary_db COLLATE ' + @col)
END
"""

with get_connection(project.adapter):
project.adapter.execute(
create_sql.format(database=project.database),
fetch=True,
)

def cleanup_secondary_database(self, project):
drop_sql = "DROP DATABASE IF EXISTS secondary_db"
with get_connection(project.adapter):
project.adapter.execute(
drop_sql.format(database=project.database),
fetch=True,
)

def cleanup_primary_table(self, project):
drop_sql = "DROP TABLE IF EXISTS {database}.mysource.claims"
with get_connection(project.adapter):
project.adapter.execute(
drop_sql.format(database=project.database),
fetch=True,
)

def cleanup_snapshot_table(self, project):
drop_sql = "DROP TABLE IF EXISTS TestDB_Secondary.dbo.claims_snapshot"
with get_connection(project.adapter):
project.adapter.execute(
drop_sql,
fetch=True,
)

def create_source_schema(self, project):
create_sql = """
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = 'mysource')
BEGIN
EXEC('CREATE SCHEMA mysource')
END
"""
with get_connection(project.adapter):
project.adapter.execute(
create_sql,
fetch=True,
)

def create_primary_table(self, project):
src_query = """
SELECT *
INTO
{database}.mysource.claims
FROM
(
SELECT
1 as id,
CAST('2024-01-01' as DATETIME2(6)) updated_at

UNION ALL

SELECT
2 as id,
CAST('2024-01-01' as DATETIME2(6)) updated_at

UNION ALL

SELECT
3 as id,
CAST('2024-01-01' as DATETIME2(6)) updated_at
) as src_data
"""
with get_connection(project.adapter):
project.adapter.execute(
src_query.format(database=project.database, schema=project.test_schema),
fetch=True,
)

def create_secondary_schema(self, project):
src_query = """
USE [secondary_db]
EXEC ('CREATE SCHEMA {schema}')
"""
with get_connection(project.adapter):
project.adapter.execute(
src_query.format(database=project.database, schema=project.test_schema),
fetch=True,
)

def update_primary_table(self, project):
sql = """
UPDATE [{database}].[mysource].[claims]
SET
updated_at = CAST('2024-02-01' as datetime2(6))
WHERE
id = 3
"""
with get_connection(project.adapter):
project.adapter.execute(
sql.format(database=project.database),
fetch=True,
)

@pytest.fixture(scope="class")
def models(self):
return {"sources.yml": sources_yml}

@pytest.fixture(scope="class")
def snapshots(self):
return {"claims_snapshot.sql": snapshot_sql}

def test_cross_db_snapshot(self, project):
self.create_secondary_db(project)

self.cleanup_primary_table(project)
self.cleanup_snapshot_table(project)

self.create_source_schema(project)
self.create_primary_table(project)
run_dbt(["snapshot"])
self.update_primary_table(project)
run_dbt(["snapshot"])

self.cleanup_snapshot_table(project)
self.cleanup_secondary_database(project)