Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding namespace fides_meta support for BigQuery datasets #5294

Merged
merged 16 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ types-defusedxml==0.7.0.20240218
expandvars==0.9.0
fastapi[all]==0.111.0
fastapi-pagination[sqlalchemy]==0.12.25
fideslang==3.0.4
fideslang @ git+https://github.com/ethyca/fideslang.git@0d8c203295d6d427b9274db5d9b8815065bdf75b
galvana marked this conversation as resolved.
Show resolved Hide resolved
fideslog==1.2.10
firebase-admin==5.3.0
GitPython==3.1.41
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class BigQuerySchema(ConnectionConfigSecretsSchema):
)
dataset: Optional[str] = Field(
default=None,
title="BigQuery Dataset",
description="The dataset within your BigQuery project that contains the tables you want to access.",
title="Default BigQuery Dataset",
description="The default BigQuery dataset that will be used if one isn't provided in the associated Fides datasets.",
galvana marked this conversation as resolved.
Show resolved Hide resolved
)

_required_components: ClassVar[List[str]] = ["keyfile_creds"]
Expand Down
Empty file.
18 changes: 18 additions & 0 deletions src/fides/api/schemas/namespace_meta/bigquery_namespace_meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Optional

from fides.api.schemas.base_class import FidesSchema


class BigQueryNamespaceMeta(FidesSchema):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm only using this in a limited scope. It'd be nice to use this to validate datasets when we link them to a specific connection config but I'm removing this from scope for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, that's an interesting idea. somewhat related to my comment on the plus PR here

"""
Represents the namespace structure for BigQuery queries.

Attributes:
project_id (Optional[str]): The ID of the Google Cloud project.
This is optional as queries within the same project may omit it.
dataset_id (str): The ID of the BigQuery dataset. This is required
for all BigQuery queries to specify the dataset being queried.
"""

project_id: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i think this makes sense, though i will say it's a bit surprising to me - since the BQ dataset does need to be associated with a project, in reality. i know we may not need that for DSR processing, but that seems to be a bit of an impl detail, rather than a fact about the dataset itself - and i feel like the dataset definition should try to describe the dataset itself as accurately as possible.

that being said, i realize that requiring this may break backward compatibility, and in general leave things less flexible, so i'm not strongly recommending we change it. i'm good with it as it is, ultimately - just wanted to throw in my 2 cents and see what you think on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I think it'd be better for this to be explicit 👍

dataset_id: str
40 changes: 34 additions & 6 deletions src/fides/api/service/connectors/query_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
from fides.api.graph.execution import ExecutionNode
from fides.api.models.policy import Policy, Rule
from fides.api.models.privacy_request import ManualAction, PrivacyRequest
from fides.api.schemas.namespace_meta.bigquery_namespace_meta import (
BigQueryNamespaceMeta,
)
from fides.api.schemas.policy import ActionType
from fides.api.service.masking.strategy.masking_strategy import MaskingStrategy
from fides.api.service.masking.strategy.masking_strategy_nullify import (
Expand Down Expand Up @@ -810,14 +813,41 @@ class BigQueryQueryConfig(QueryStringWithoutTuplesOverrideQueryConfig):
Generates SQL valid for BigQuery
"""

def __init__(
self,
node: ExecutionNode,
namespace_meta: Optional[BigQueryNamespaceMeta] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the init to include an optional namespace_meta object

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we make this support a bit more generic? following the pattern we've taken on the D&D side, it feels like we could support (a less strongly-typed) namespace_meta attribute on the SQLQueryConfig generically, and rely on the implementation/subclass to make use of the namespace_meta as it sees fit, i.e. in the datasource-specific way.

the fact that you've already typed the Dataset.fides_meta.namespace as a generic Dict should support this pattern pretty well.

what do you think? maybe it doesn't need to be something we cover now, although i'd kinda like to see it, since i feel like it will only get more cumbersome to implement if we don't update it now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is valid, I went ahead and made this change. Let me know what you think of my implementation

):
"""
Accepts an optional namespace_meta param to be able to specify dataset and project IDs for the generated queries.
"""
super().__init__(node)
self.namespace_meta = namespace_meta

def _generate_table_name(self) -> str:
"""
Prepends the dataset ID and project ID to the base table name
if the BigQuery namespace meta is provided.
"""

table_name = self.node.collection.name
if self.namespace_meta:
table_name = f"{self.namespace_meta.dataset_id}.{table_name}"
if project_id := self.namespace_meta.project_id:
table_name = f"{project_id}.{table_name}"
return table_name
galvana marked this conversation as resolved.
Show resolved Hide resolved

def get_formatted_query_string(
self,
field_list: str,
clauses: List[str],
) -> str:
"""Returns a query string with backtick formatting for tables that have the same names as
BigQuery reserved words."""
return f'SELECT {field_list} FROM `{self.node.collection.name}` WHERE {" OR ".join(clauses)}'
"""
Returns a query string with backtick formatting for tables that have the same names as
BigQuery reserved words.
"""

return f'SELECT {field_list} FROM `{self._generate_table_name()}` WHERE {" OR ".join(clauses)}'
galvana marked this conversation as resolved.
Show resolved Hide resolved

def generate_update(
self, row: Row, policy: Policy, request: PrivacyRequest, client: Engine
Expand All @@ -843,9 +873,7 @@ def generate_update(
)
return None

table = Table(
self.node.address.collection, MetaData(bind=client), autoload=True
)
table = Table(self._generate_table_name(), MetaData(bind=client), autoload=True)
pk_clauses: List[ColumnElement] = [
getattr(table.c, k) == v for k, v in non_empty_primary_keys.items()
]
Expand Down
32 changes: 30 additions & 2 deletions src/fides/api/service/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from google.oauth2 import service_account
from loguru import logger
from snowflake.sqlalchemy import URL as Snowflake_URL
from sqlalchemy import Column, text
from sqlalchemy import Column, select, text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.engine import ( # type: ignore
URL,
Connection,
Expand All @@ -24,6 +25,7 @@
create_engine,
)
from sqlalchemy.exc import InternalError, OperationalError
from sqlalchemy.orm import Session
from sqlalchemy.sql import Executable # type: ignore
from sqlalchemy.sql.elements import TextClause

Expand Down Expand Up @@ -57,6 +59,9 @@
from fides.api.schemas.connection_configuration.connection_secrets_mysql import (
MySQLSchema,
)
from fides.api.schemas.namespace_meta.bigquery_namespace_meta import (
BigQueryNamespaceMeta,
)
from fides.api.service.connectors.base_connector import BaseConnector
from fides.api.service.connectors.query_config import (
BigQueryQueryConfig,
Expand All @@ -71,6 +76,10 @@
from fides.api.util.collection_util import Row
from fides.config import get_config

from fides.api.models.sql_models import ( # type: ignore[attr-defined] # isort: skip
Dataset as CtlDataset,
)

CONFIG = get_config()

sshtunnel.SSH_TIMEOUT = CONFIG.security.bastion_server_ssh_timeout
Expand Down Expand Up @@ -115,6 +124,18 @@ def default_cursor_result_to_rows(results: LegacyCursorResult) -> List[Row]:
rows.append({col[0]: row_tuple[count] for count, col in enumerate(columns)})
return rows

@staticmethod
def get_namespace_meta(db: Session, dataset: str) -> Optional[Dict[str, Any]]:
galvana marked this conversation as resolved.
Show resolved Hide resolved
"""
Util function to return the namespace meta for a given ctl_dataset.
"""

return db.scalar(
select(CtlDataset.fides_meta["namespace"].cast(JSONB)).where(
galvana marked this conversation as resolved.
Show resolved Hide resolved
CtlDataset.fides_key == dataset
)
)

@abstractmethod
def build_uri(self) -> Optional[str]:
"""Build a database specific uri connection string"""
Expand Down Expand Up @@ -529,7 +550,14 @@ def create_client(self) -> Engine:
# Overrides SQLConnector.query_config
def query_config(self, node: ExecutionNode) -> BigQueryQueryConfig:
"""Query wrapper corresponding to the input execution_node."""
return BigQueryQueryConfig(node)

db: Session = Session.object_session(self.configuration)
namespace_meta: Optional[BigQueryNamespaceMeta] = None

if raw_meta := SQLConnector.get_namespace_meta(db, node.address.dataset):
namespace_meta = BigQueryNamespaceMeta(**raw_meta)
galvana marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right so what i'm thinking is that instead of "casting" the raw_meta here, we'd cast it within the scope of the BigQueryQueryConfig, such that we could initialize a generic namespace_meta attribute generically on the SQLConnector base class

Copy link
Contributor Author

@galvana galvana Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it to this

db: Session = Session.object_session(self.configuration)
return BigQueryQueryConfig(
    node, SQLConnector.get_namespace_meta(db, node.address.dataset)
)

The SQLLikeQueryConfig super class validates the meta dict based on the namespace_meta_schema defined at the BigQueryQueryConfig

class SQLLikeQueryConfig(QueryConfig[T], ABC):
    """
    Abstract query config for SQL-like languages (that may not be strictly SQL).
    """

    namespace_meta_schema: Optional[Type[NamespaceMeta]] = None

    def __init__(self, node: ExecutionNode, namespace_meta: Optional[Dict] = None):
        super().__init__(node)
        self.namespace_meta: Optional[NamespaceMeta] = None

        if namespace_meta is not None:
            if self.namespace_meta_schema is None:
                raise MissingNamespaceSchemaException(
                    f"{self.__class__.__name__} must define a namespace_meta_schema when namespace_meta is provided."
                )
            try:
                self.namespace_meta = self.namespace_meta_schema.model_validate(
                    namespace_meta
                )
            except ValidationError as exc:
                raise ValueError(f"Invalid namespace_meta: {exc}")


return BigQueryQueryConfig(node, namespace_meta)

# Overrides SQLConnector.test_connection
def test_connection(self) -> Optional[ConnectionTestStatus]:
Expand Down
17 changes: 17 additions & 0 deletions tests/ctl/core/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,23 @@ def test_field_data_categories(db) -> None:
assert ctl_dataset.field_data_categories


@pytest.mark.unit
def test_namespace_meta(db) -> None:
ctl_dataset = CtlDataset.create_from_dataset_dict(
db,
{
"fides_key": f"dataset_key-f{uuid4()}",
"fides_meta": {"namespace": {"dataset_id": "public"}},
"collections": [],
},
)
assert ctl_dataset.fides_meta == {
"resource_id": None,
"after": None,
"namespace": {"dataset_id": "public"},
}


# Generate Dataset Database Integration Tests

# These URLs are for the databases in the docker-compose.integration-tests.yml file
Expand Down
112 changes: 112 additions & 0 deletions tests/fixtures/bigquery_fixtures.py
galvana marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,30 @@ def bigquery_connection_config(db: Session) -> Generator:
connection_config.delete(db)


@pytest.fixture(scope="function")
def bigquery_connection_config_without_default_dataset(db: Session) -> Generator:
connection_config = ConnectionConfig.create(
db=db,
data={
"name": str(uuid4()),
"key": "my_bigquery_config",
"connection_type": ConnectionType.bigquery,
"access": AccessLevel.write,
},
)
# Pulling from integration config file or GitHub secrets
keyfile_creds = integration_config.get("bigquery", {}).get(
"keyfile_creds"
) or ast.literal_eval(os.environ.get("BIGQUERY_KEYFILE_CREDS"))
if keyfile_creds:
schema = BigQuerySchema(keyfile_creds=keyfile_creds)
connection_config.secrets = schema.model_dump(mode="json")
connection_config.save(db=db)

yield connection_config
connection_config.delete(db)


@pytest.fixture
def bigquery_example_test_dataset_config(
bigquery_connection_config: ConnectionConfig,
Expand Down Expand Up @@ -88,6 +112,39 @@ def bigquery_example_test_dataset_config(
ctl_dataset.delete(db=db)


@pytest.fixture
def bigquery_example_test_dataset_config_with_namespace_meta(
bigquery_connection_config_without_default_dataset: ConnectionConfig,
db: Session,
example_datasets: List[Dict],
) -> Generator:
bigquery_dataset = example_datasets[7]
bigquery_dataset["fides_meta"] = {
"namespace": {
"project_id": "silken-precinct-284918",
"dataset_id": "fidesopstest",
}
}
fides_key = bigquery_dataset["fides_key"]
bigquery_connection_config_without_default_dataset.name = fides_key
bigquery_connection_config_without_default_dataset.key = fides_key
bigquery_connection_config_without_default_dataset.save(db=db)

ctl_dataset = CtlDataset.create_from_dataset_dict(db, bigquery_dataset)

dataset = DatasetConfig.create(
db=db,
data={
"connection_config_id": bigquery_connection_config_without_default_dataset.id,
"fides_key": fides_key,
"ctl_dataset_id": ctl_dataset.id,
},
)
yield dataset
dataset.delete(db=db)
ctl_dataset.delete(db=db)


@pytest.fixture(scope="function")
def bigquery_resources(
bigquery_example_test_dataset_config,
Expand Down Expand Up @@ -140,6 +197,61 @@ def bigquery_resources(
connection.execute(stmt)


@pytest.fixture(scope="function")
def bigquery_resources_with_namespace_meta(
bigquery_example_test_dataset_config_with_namespace_meta,
):
bigquery_connection_config = (
bigquery_example_test_dataset_config_with_namespace_meta.connection_config
)
connector = BigQueryConnector(bigquery_connection_config)
bigquery_client = connector.client()
with bigquery_client.connect() as connection:
uuid = str(uuid4())
customer_email = f"customer-{uuid}@example.com"
customer_name = f"{uuid}"

stmt = "select max(id) from fidesopstest.customer;"
res = connection.execute(stmt)
customer_id = res.all()[0][0] + 1

stmt = "select max(id) from fidesopstest.address;"
res = connection.execute(stmt)
address_id = res.all()[0][0] + 1

city = "Test City"
state = "TX"
stmt = f"""
insert into fidesopstest.address (id, house, street, city, state, zip)
values ({address_id}, '{111}', 'Test Street', '{city}', '{state}', '55555');
"""
connection.execute(stmt)

stmt = f"""
insert into fidesopstest.customer (id, email, name, address_id)
values ({customer_id}, '{customer_email}', '{customer_name}', {address_id});
"""
connection.execute(stmt)

yield {
"email": customer_email,
"name": customer_name,
"id": customer_id,
"client": bigquery_client,
"address_id": address_id,
"city": city,
"state": state,
"connector": connector,
"dataset": bigquery_example_test_dataset_config_with_namespace_meta.fides_key,
}
# Remove test data and close BigQuery connection in teardown
stmt = f"delete from fidesopstest.customer where email = '{customer_email}';"
connection.execute(stmt)

stmt = f"delete from fidesopstest.address where id = {address_id};"
connection.execute(stmt)


@pytest.fixture(scope="session")
def bigquery_test_engine() -> Generator:
"""Return a connection to a Google BigQuery Warehouse"""
Expand Down
Loading
Loading