Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clickhouse profile mapping #353

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cosmos/profiles/__init__.py
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
from .trino.certificate import TrinoCertificateProfileMapping
from .trino.jwt import TrinoJWTProfileMapping
from .trino.ldap import TrinoLDAPProfileMapping
from .clickhouse.user_pass import ClickhouseUserPasswordProfileMapping

profile_mappings: list[Type[BaseProfileMapping]] = [
GoogleCloudServiceAccountFileProfileMapping,
@@ -29,6 +30,7 @@
TrinoLDAPProfileMapping,
TrinoCertificateProfileMapping,
TrinoJWTProfileMapping,
ClickhouseUserPasswordProfileMapping,
]


5 changes: 5 additions & 0 deletions cosmos/profiles/clickhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"Generic Airflow connection -> dbt profile mappings"

from .user_pass import ClickhouseUserPasswordProfileMapping

__all__ = ["ClickhouseUserPasswordProfileMapping"]
55 changes: 55 additions & 0 deletions cosmos/profiles/clickhouse/user_pass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"Maps Airflow Postgres connections using user + password authentication to dbt profiles."
from __future__ import annotations

from typing import Any

from ..base import BaseProfileMapping


class ClickhouseUserPasswordProfileMapping(BaseProfileMapping):
"""
Maps Airflow generic connections using user + password authentication to dbt Clickhouse profiles.
https://docs.getdbt.com/docs/core/connect-data-platform/clickhouse-setup
"""

airflow_connection_type: str = "generic"
default_port = 9000

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

required_fields = [
"host",
"login",
"schema",
"clickhouse",
]
secret_fields = [
"password",
]
airflow_param_mapping = {
"host": "host",
"login": "login",
"password": "password",
"port": "port",
"schema": "schema",
"clickhouse": "extra.clickhouse",
}

@property
def profile(self) -> dict[str, Any | None]:
"Gets profile. The password is stored in an environment variable."
profile = {
"type": "clickhouse",
"schema": self.conn.schema,
"login": self.conn.login,
# password should always get set as env var
"password": self.get_env_var_format("password"),
"driver": self.conn.extra_dejson.get("driver") or "native",
"port": self.conn.port or self.default_port,
"host": self.conn.host,
"secure": self.conn.extra_dejson.get("secure") or False,
"keepalives_idle": self.conn.extra_dejson.get("keepalives_idle"),
"sslmode": self.conn.extra_dejson.get("sslmode"),
"clickhouse": self.conn.extra_dejson.get("clickhouse"),
**self.profile_args,
}

return self.filter_null(profile)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, could you add the mock_profile function & a test for it?
https://github.com/astronomer/astronomer-cosmos/blob/3786703609e69c1e8f4b2db1475fe8b6ea00a117/cosmos/profiles/base.py#L97C7-L97C7

This was introduced after this PR was created and is now required.

10 changes: 10 additions & 0 deletions docs/dbt/connections-profiles.rst
Original file line number Diff line number Diff line change
@@ -190,3 +190,13 @@ Certificate
:undoc-members:
:members:
:show-inheritance:

Clickhouse
----------

Username and Password
~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: cosmos.profiles.Clickhouse.ClickhouseUserPasswordProfileMapping
:undoc-members:
:members:
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -51,8 +51,12 @@ dbt-all = [
"dbt-redshift",
"dbt-snowflake",
"dbt-spark",
"dbt-clickhouse",
"astronomer-cosmos[dbt-openlineage]"
]
dbt-clickhouse = [
"dbt-clickhouse",
]
dbt-bigquery = [
"dbt-bigquery",
]
119 changes: 119 additions & 0 deletions tests/profiles/clickhouse/test_clickhouse_userpass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"Tests for the clickhouse profile."

from unittest.mock import patch

import pytest
from airflow.models.connection import Connection

from cosmos.profiles import get_profile_mapping
from cosmos.profiles.clickhouse.user_pass import (
ClickhouseUserPasswordProfileMapping,
)


@pytest.fixture()
def mock_clickhouse_conn(): # type: ignore
"""
Sets the connection as an environment variable.
"""
conn = Connection(
conn_id="clickhouse_connection",
conn_type="generic",
host="my_host",
login="my_user",
password="my_password",
schema="my_database",
extra='{"clickhouse": "True"}',
)

with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
yield conn


def test_connection_claiming() -> None:
"""
Tests that the clickhose profile mapping claims the correct connection type.
"""
# should only claim when:
# - conn_type == generic
# and the following exist:
# - host
# - login
# - password
# - schema
# - extra.clickhouse
potential_values = {
"conn_type": "generic",
"host": "my_host",
"login": "my_user",
"schema": "my_database",
"extra": '{"clickhouse": "True"}',
}

# if we're missing any of the values, it shouldn't claim
for key in potential_values:
values = potential_values.copy()
del values[key]
conn = Connection(**values) # type: ignore

print("testing with", values)

profile_mapping = ClickhouseUserPasswordProfileMapping(conn, {})
assert not profile_mapping.can_claim_connection()

# if we have them all, it should claim
conn = Connection(**potential_values) # type: ignore
profile_mapping = ClickhouseUserPasswordProfileMapping(conn, {})
assert profile_mapping.can_claim_connection()


def test_profile_mapping_selected(
mock_clickhouse_conn: Connection,
) -> None:
"""
Tests that the correct profile mapping is selected.
"""
profile_mapping = get_profile_mapping(
mock_clickhouse_conn.conn_id,
{},
)
assert isinstance(profile_mapping, ClickhouseUserPasswordProfileMapping)


def test_profile_args(
mock_clickhouse_conn: Connection,
) -> None:
"""
Tests that the profile values get set correctly.
"""
profile_mapping = get_profile_mapping(
mock_clickhouse_conn.conn_id,
profile_args={},
)

assert profile_mapping.profile == {
"type": "clickhouse",
"schema": mock_clickhouse_conn.schema,
"login": mock_clickhouse_conn.login,
"password": "{{ env_var('COSMOS_CONN_GENERIC_PASSWORD') }}",
"driver": "native",
"port": 9000,
"host": mock_clickhouse_conn.host,
"secure": False,
"clickhouse": "True",
}


def test_profile_env_vars(
mock_clickhouse_conn: Connection,
) -> None:
"""
Tests that the environment variables get set correctly.
"""
profile_mapping = get_profile_mapping(
mock_clickhouse_conn.conn_id,
profile_args={},
)
assert profile_mapping.env_vars == {
"COSMOS_CONN_GENERIC_PASSWORD": mock_clickhouse_conn.password,
}