diff --git a/.fides/redis_dataset.yml b/.fides/redis_dataset.yml index 56f66e7bf5..c2c3831df3 100644 --- a/.fides/redis_dataset.yml +++ b/.fides/redis_dataset.yml @@ -20,6 +20,17 @@ dataset: data_categories: [system.operations] fidesops_meta: data_type: string[] # List of edges between the upstream collection and the current collection + - name: EN_DATA_USE_MAP__ + description: This map of traversed `Collection`s to associated `DataUse`s is stored and retrieved to be included in access request output packages. + data_qualifier: aggregated.anonymized.unlinked_pseudonymized.pseudonymized.identified + fidesops_meta: + data_type: object # Dict mapping `Collection` addresses -> set of associated `DataUse`s + fields: + - name: : # `Collection` address + data_qualifier: aggregated.anonymized.unlinked_pseudonymized.pseudonymized.identified + data_categories: [system.operations] + fidesops_meta: + data_type: string[] # set of `DataUse`s associated with this `Collection` - name: EN_EMAIL_INFORMATION________ # Usage: For building emails associated with email-connector datasets at the end of the privacy request. This encrypted raw information is retrieved from each relevant email-based collection and used to build a single email per email connector, with instructions on how to mask data on the given dataset. fidesops_meta: data_type: object # Stores how to locate and mask records for a given "email" collection. diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d1d039e33..23e943bd7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The types of changes are: ### Added - Empty state for when there are no relevant privacy notices in the privacy center [#3640](https://github.com/ethyca/fides/pull/3640) - Set `sslmode` to `prefer` if connecting to Redshift via ssh [#3685](https://github.com/ethyca/fides/pull/3685) +- Include `data_use` and `data_category` metadata in `upload` of access results [#3674](https://github.com/ethyca/fides/pull/3674) ### Fixed - Render linebreaks in the Fides.js overlay descriptions, etc. [#3665](https://github.com/ethyca/fides/pull/3665) diff --git a/src/fides/api/graph/graph.py b/src/fides/api/graph/graph.py index 7b62ee4d95..c53eba9371 100644 --- a/src/fides/api/graph/graph.py +++ b/src/fides/api/graph/graph.py @@ -18,6 +18,8 @@ SeedAddress, ) +DataCategoryFieldMapping = Dict[CollectionAddress, Dict[FidesKey, List[FieldPath]]] + class Node: """A traversal_node represents a single collection as a graph traversal_node. @@ -229,7 +231,7 @@ def __init__(self, *datasets: GraphDataset) -> None: @property def data_category_field_mapping( self, - ) -> Dict[CollectionAddress, Dict[FidesKey, List[FieldPath]]]: + ) -> DataCategoryFieldMapping: """ Maps the data_categories for each traversal_node to a list of field paths that have that same data category. diff --git a/src/fides/api/models/privacy_request.py b/src/fides/api/models/privacy_request.py index b513ef9eb3..adeebe1225 100644 --- a/src/fides/api/models/privacy_request.py +++ b/src/fides/api/models/privacy_request.py @@ -5,7 +5,7 @@ import json from datetime import datetime, timedelta from enum import Enum as EnumType -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Set, Union from celery.result import AsyncResult from loguru import logger @@ -660,6 +660,24 @@ def get_cached_access_graph(self) -> Optional[GraphRepr]: ] = cache.get_encoded_objects_by_prefix(f"ACCESS_GRAPH__{self.id}") return list(value_dict.values())[0] if value_dict else None + def cache_data_use_map(self, value: Dict[str, Set[str]]) -> None: + """ + Cache a dict of collections traversed in the privacy request + mapped to their associated data uses + """ + cache: FidesopsRedis = get_cache() + cache.set_encoded_object(f"DATA_USE_MAP__{self.id}", value) + + def get_cached_data_use_map(self) -> Optional[Dict[str, Set[str]]]: + """ + Fetch the collection -> data use map cached for this privacy request + """ + cache: FidesopsRedis = get_cache() + value_dict: Optional[ + Dict[str, Optional[Dict[str, Set[str]]]] + ] = cache.get_encoded_objects_by_prefix(f"DATA_USE_MAP__{self.id}") + return list(value_dict.values())[0] if value_dict else None + def trigger_policy_webhook( self, webhook: WebhookTypes, diff --git a/src/fides/api/service/privacy_request/request_runner_service.py b/src/fides/api/service/privacy_request/request_runner_service.py index dcfd964b09..b7d5116a63 100644 --- a/src/fides/api/service/privacy_request/request_runner_service.py +++ b/src/fides/api/service/privacy_request/request_runner_service.py @@ -231,6 +231,8 @@ def upload_access_results( # pylint: disable=R0912 privacy_request=privacy_request, data=filtered_results, storage_key=storage_destination.key, # type: ignore + data_category_field_mapping=dataset_graph.data_category_field_mapping, + data_use_map=privacy_request.get_cached_data_use_map(), ) if download_url: download_urls.append(download_url) diff --git a/src/fides/api/service/storage/storage_uploader_service.py b/src/fides/api/service/storage/storage_uploader_service.py index 6ce7dd8e2c..a94143add1 100644 --- a/src/fides/api/service/storage/storage_uploader_service.py +++ b/src/fides/api/service/storage/storage_uploader_service.py @@ -1,10 +1,11 @@ -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Set from fideslang.validation import FidesKey from loguru import logger from sqlalchemy.orm import Session from fides.api.common_exceptions import StorageUploadError +from fides.api.graph.graph import DataCategoryFieldMapping from fides.api.models.privacy_request import PrivacyRequest from fides.api.models.storage import StorageConfig from fides.api.schemas.storage.storage import ( @@ -17,7 +18,12 @@ def upload( - db: Session, *, privacy_request: PrivacyRequest, data: Dict, storage_key: FidesKey + db: Session, + privacy_request: PrivacyRequest, + data: Dict, + storage_key: FidesKey, + data_category_field_mapping: Optional[DataCategoryFieldMapping] = None, + data_use_map: Optional[Dict[str, Set[str]]] = None, ) -> str: """ Retrieves storage configs and calls appropriate upload method @@ -35,7 +41,9 @@ def upload( logger.warning("Storage type not found: {}", storage_key) raise StorageUploadError(f"Storage type not found: {storage_key}") uploader: Any = _get_uploader_from_config_type(config.type) # type: ignore - return uploader(db, config, data, privacy_request) + return uploader( + db, config, data, privacy_request, data_category_field_mapping, data_use_map + ) def get_extension(resp_format: ResponseFormat) -> str: @@ -76,7 +84,12 @@ def _get_uploader_from_config_type(storage_type: StorageType) -> Any: def _s3_uploader( - _: Session, config: StorageConfig, data: Dict, privacy_request: PrivacyRequest + _: Session, + config: StorageConfig, + data: Dict, + privacy_request: PrivacyRequest, + data_category_field_mapping: Optional[DataCategoryFieldMapping] = None, + data_use_map: Optional[Dict[str, Set[str]]] = None, ) -> str: """Constructs necessary info needed for s3 before calling upload""" file_key: str = _construct_file_key(privacy_request.id, config) @@ -85,13 +98,18 @@ def _s3_uploader( auth_method = config.details[StorageDetails.AUTH_METHOD.value] return upload_to_s3( - config.secrets, data, bucket_name, file_key, config.format.value, privacy_request, auth_method # type: ignore + config.secrets, data, bucket_name, file_key, config.format.value, privacy_request, auth_method, data_category_field_mapping, data_use_map # type: ignore ) def _local_uploader( - _: Session, config: StorageConfig, data: Dict, privacy_request: PrivacyRequest + _: Session, + config: StorageConfig, + data: Dict, + privacy_request: PrivacyRequest, + data_category_field_mapping: Optional[DataCategoryFieldMapping] = None, + data_use_map: Optional[Dict[str, Set[str]]] = None, ) -> str: """Uploads data to local storage, used for quick-start/demo purposes""" file_key: str = _construct_file_key(privacy_request.id, config) - return upload_to_local(data, file_key, privacy_request, config.format.value) # type: ignore + return upload_to_local(data, file_key, privacy_request, config.format.value, data_category_field_mapping, data_use_map) # type: ignore diff --git a/src/fides/api/task/graph_task.py b/src/fides/api/task/graph_task.py index a91a7b4754..c156456dd7 100644 --- a/src/fides/api/task/graph_task.py +++ b/src/fides/api/task/graph_task.py @@ -38,6 +38,7 @@ from fides.api.models.connectionconfig import AccessLevel, ConnectionConfig from fides.api.models.policy import Policy from fides.api.models.privacy_request import ExecutionLogStatus, PrivacyRequest +from fides.api.models.sql_models import System # type: ignore[attr-defined] from fides.api.schemas.policy import ActionType from fides.api.service.connectors.base_connector import BaseConnector from fides.api.task.consolidate_query_matches import consolidate_query_matches @@ -174,6 +175,13 @@ def __init__( self.connector: BaseConnector = resources.get_connector( self.traversal_node.node.dataset.connection_key # ConnectionConfig.key ) + self.data_uses: Set[str] = ( + System.get_data_uses( + [self.connector.configuration.system], include_parents=False + ) + if self.connector.configuration.system + else {} + ) # build incoming edges to the form : [dataset address: [(foreign field, local field)] self.incoming_edges_by_collection: Dict[ @@ -658,6 +666,28 @@ def update_mapping_from_cache( ) +def _format_data_use_map_for_caching( + env: Dict[CollectionAddress, "GraphTask"] +) -> Dict[str, Set[str]]: + """ + Create a map of `Collection`s mapped to their associated `DataUse`s + to be stored in the cache. This is done before request execution, so that we + maintain the _original_ state of the graph as it's used for request execution. + The graph is subject to change "from underneath" the request execution runtime, + but we want to avoid picking up those changes in our data use map. + + `DataUse`s are associated with a `Collection` by means of the `System` + that's linked to a `Collection`'s `Connection` definition. + + Example: + { + : {"data_use_1", "data_use_2"}, + : {"data_use_1"}, + } + """ + return {collection.value: g_task.data_uses for collection, g_task in env.items()} + + def start_function(seed: List[Dict[str, Any]]) -> Callable[[], List[Dict[str, Any]]]: """Return a function for collections with no upstream dependencies, that just start with seed data. @@ -715,8 +745,16 @@ def termination_fn( privacy_request, env, end_nodes, resources, ActionType.access ) ) + + # cache access graph for use in logging/analytics event privacy_request.cache_access_graph(format_graph_for_caching(env, end_nodes)) + # cache a map of collections -> data uses for the output package of access requests + # this is cached here before request execution, since this is the state of the + # graph used for request execution. the graph could change _during_ request execution, + # but we don't want those changes in our data use map. + privacy_request.cache_data_use_map(_format_data_use_map_for_caching(env)) + v = delayed(get(dsk, TERMINATOR_ADDRESS, num_workers=1)) return v.compute() diff --git a/src/fides/api/tasks/storage.py b/src/fides/api/tasks/storage.py index 269029dc44..b0c48fc72a 100644 --- a/src/fides/api/tasks/storage.py +++ b/src/fides/api/tasks/storage.py @@ -5,7 +5,7 @@ import secrets import zipfile from io import BytesIO -from typing import Any, Dict, Union +from typing import Any, Dict, Optional, Set, Union import pandas as pd from boto3 import Session @@ -13,6 +13,7 @@ from loguru import logger from fides.api.cryptography.cryptographic_util import bytes_to_b64_str +from fides.api.graph.graph import DataCategoryFieldMapping from fides.api.models.privacy_request import PrivacyRequest from fides.api.schemas.storage.storage import ( ResponseFormat, @@ -133,6 +134,8 @@ def upload_to_s3( # pylint: disable=R0913 resp_format: str, privacy_request: PrivacyRequest, auth_method: S3AuthMethod, + data_category_field_mapping: Optional[DataCategoryFieldMapping] = None, + data_use_map: Optional[Dict[str, Set[str]]] = None, ) -> str: """Uploads arbitrary data to s3 returned from an access request""" logger.info("Starting S3 Upload of {}", file_key) @@ -171,6 +174,8 @@ def upload_to_local( file_key: str, privacy_request: PrivacyRequest, resp_format: str = ResponseFormat.json.value, + data_category_field_mapping: Optional[DataCategoryFieldMapping] = None, + data_use_map: Optional[Dict[str, Set[str]]] = None, ) -> str: """Uploads access request data to a local folder - for testing/demo purposes only""" if not os.path.exists(LOCAL_FIDES_UPLOAD_DIRECTORY): diff --git a/tests/fixtures/postgres_fixtures.py b/tests/fixtures/postgres_fixtures.py index e483b63afa..3ac0668026 100644 --- a/tests/fixtures/postgres_fixtures.py +++ b/tests/fixtures/postgres_fixtures.py @@ -19,6 +19,7 @@ PrivacyRequest, ) from fides.api.models.sql_models import Dataset as CtlDataset +from fides.api.models.sql_models import System from fides.api.service.connectors import PostgreSQLConnector from fides.config import CONFIG from tests.ops.test_helpers.db_utils import seed_postgres_data @@ -178,6 +179,7 @@ def disabled_connection_config( @pytest.fixture(scope="function") def read_connection_config( db: Session, + system: System, ) -> Generator: connection_config = ConnectionConfig.create( db=db, @@ -186,6 +188,7 @@ def read_connection_config( "key": "my_postgres_db_1_read_config", "connection_type": ConnectionType.postgres, "access": AccessLevel.read, + "system_id": system.id, "secrets": integration_secrets["postgres_example"], "description": "Read-only connection config", }, diff --git a/tests/ops/service/privacy_request/test_request_runner_service.py b/tests/ops/service/privacy_request/test_request_runner_service.py index ce6ff99211..ba5884c225 100644 --- a/tests/ops/service/privacy_request/test_request_runner_service.py +++ b/tests/ops/service/privacy_request/test_request_runner_service.py @@ -17,6 +17,7 @@ ClientUnsuccessfulException, PrivacyRequestPaused, ) +from fides.api.graph.config import CollectionAddress, FieldPath from fides.api.graph.graph import DatasetGraph from fides.api.models.application_config import ApplicationConfig from fides.api.models.audit_log import AuditLog, AuditLogAction @@ -311,6 +312,121 @@ def get_privacy_request_results( return PrivacyRequest.get(db=db, object_id=privacy_request.id) +@pytest.mark.integration_postgres +@pytest.mark.integration +@mock.patch("fides.api.service.privacy_request.request_runner_service.upload") +def test_upload_access_results_has_data_category_field_mapping( + upload_mock: Mock, + postgres_example_test_dataset_config_read_access, + postgres_integration_db, + db, + policy, + run_privacy_request_task, +): + """ + Ensure we are passing along a correctly populated data_category_field_mapping to the 'upload' function + that publishes the access request output. + """ + customer_email = "customer-1@example.com" + data = { + "requested_at": "2021-08-30T16:09:37.359Z", + "policy_key": policy.key, + "identity": {"email": customer_email}, + } + + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + ) + + # sanity check that acccess results returned as expected + results = pr.get_results() + assert len(results.keys()) == 11 + + # what we're really testing - ensure data_category_field_mapping arg is well-populated + args, kwargs = upload_mock.call_args + data_category_field_mapping = kwargs["data_category_field_mapping"] + + # make sure the category field mapping generally looks as we expect + address_mapping = data_category_field_mapping[ + CollectionAddress.from_string("postgres_example_test_dataset:address") + ] + assert len(address_mapping) >= 5 + assert address_mapping["user.contact.address.street"] == [ + FieldPath("house"), + FieldPath("street"), + ] + product_mapping = data_category_field_mapping[ + CollectionAddress.from_string("postgres_example_test_dataset:product") + ] + assert len(product_mapping) >= 1 + assert product_mapping["system.operations"] == [ + FieldPath( + "id", + ), + FieldPath( + "name", + ), + FieldPath( + "price", + ), + ] + + +@pytest.mark.integration_postgres +@pytest.mark.integration +@mock.patch("fides.api.service.privacy_request.request_runner_service.upload") +def test_upload_access_results_has_data_use_map( + upload_mock: Mock, + postgres_example_test_dataset_config_read_access, + postgres_integration_db, + db, + policy, + run_privacy_request_task, +): + """ + Ensure we are passing along a correctly populated data_use_map to the 'upload' function + that publishes the access request output. + """ + customer_email = "customer-1@example.com" + data = { + "requested_at": "2021-08-30T16:09:37.359Z", + "policy_key": policy.key, + "identity": {"email": customer_email}, + } + + pr = get_privacy_request_results( + db, + policy, + run_privacy_request_task, + data, + ) + + # sanity check that acccess results returned as expected + results = pr.get_results() + assert len(results.keys()) == 11 + + # what we're really testing - ensure data_use_map arg is well-populated + args, kwargs = upload_mock.call_args + data_use_map = kwargs["data_use_map"] + + assert data_use_map == { + "postgres_example_test_dataset:report": "{'marketing.advertising'}", + "postgres_example_test_dataset:employee": "{'marketing.advertising'}", + "postgres_example_test_dataset:customer": "{'marketing.advertising'}", + "postgres_example_test_dataset:service_request": "{'marketing.advertising'}", + "postgres_example_test_dataset:visit": "{'marketing.advertising'}", + "postgres_example_test_dataset:address": "{'marketing.advertising'}", + "postgres_example_test_dataset:login": "{'marketing.advertising'}", + "postgres_example_test_dataset:orders": "{'marketing.advertising'}", + "postgres_example_test_dataset:payment_card": "{'marketing.advertising'}", + "postgres_example_test_dataset:order_item": "{'marketing.advertising'}", + "postgres_example_test_dataset:product": "{'marketing.advertising'}", + } + + @pytest.mark.integration_postgres @pytest.mark.integration @mock.patch("fides.api.models.privacy_request.PrivacyRequest.trigger_policy_webhook") diff --git a/tests/ops/service/test_storage_uploader_service.py b/tests/ops/service/test_storage_uploader_service.py index 82298a4bf6..ad6beaf700 100644 --- a/tests/ops/service/test_storage_uploader_service.py +++ b/tests/ops/service/test_storage_uploader_service.py @@ -78,6 +78,8 @@ def test_uploader_s3_success_secrets_auth( "json", privacy_request, S3AuthMethod.SECRET_KEYS.value, + None, + None, ) storage_config.delete(db) @@ -183,6 +185,8 @@ def test_uploader_s3_success_automatic_auth( "json", privacy_request, S3AuthMethod.AUTOMATIC.value, + None, + None, ) storage_config.delete(db) diff --git a/tests/ops/task/test_graph_task.py b/tests/ops/task/test_graph_task.py index 1b58c83b49..a4e14b5789 100644 --- a/tests/ops/task/test_graph_task.py +++ b/tests/ops/task/test_graph_task.py @@ -1,8 +1,10 @@ from typing import Any, Dict from unittest import mock +from uuid import uuid4 import pytest from bson import ObjectId +from fideslang.models import Dataset from fides.api.common_exceptions import SkippingConsentPropagation from fides.api.graph.config import ( @@ -15,20 +17,28 @@ ) from fides.api.graph.graph import DatasetGraph from fides.api.graph.traversal import Traversal, TraversalNode -from fides.api.models.connectionconfig import ConnectionConfig, ConnectionType +from fides.api.models.connectionconfig import ( + AccessLevel, + ConnectionConfig, + ConnectionType, +) +from fides.api.models.datasetconfig import DatasetConfig from fides.api.models.policy import Policy, Rule, RuleTarget from fides.api.models.privacy_request import ExecutionLog, ExecutionLogStatus +from fides.api.models.sql_models import Dataset as CtlDataset from fides.api.schemas.policy import ActionType from fides.api.task.graph_task import ( EMPTY_REQUEST, GraphTask, TaskResources, _evaluate_erasure_dependencies, + _format_data_use_map_for_caching, build_affected_field_logs, collect_queries, start_function, update_erasure_mapping_from_cache, ) +from fides.api.task.task_resources import Connections from fides.api.util.consent_util import ( cache_initial_status_and_identities_for_consent_reporting, ) @@ -586,9 +596,9 @@ def test_multiple_rules_targeting_same_field(self, node_fixture): class TestUpdateErasureMappingFromCache: @pytest.fixture(scope="function") - def task_resource(self, privacy_request, policy, db): + def task_resource(self, privacy_request, policy, db, connection_config): tr = TaskResources(privacy_request, policy, [], db) - tr.get_connector = lambda x: True + tr.get_connector = lambda x: Connections.build_connector(connection_config) return tr @pytest.fixture(scope="function") @@ -669,6 +679,211 @@ def test_update_erasure_mapping_from_cache_with_data(self, dsk, task_resource): assert dsk[CollectionAddress("dr_1", "ds_1")] == 1 +class TestFormatDataUseMapForCaching: + def create_dataset(self, db, fides_key, connection_config): + """ + Util to create dataset and dataset config used in fixtures + """ + ds = Dataset( + fides_key=fides_key, + organization_fides_key="default_organization", + name="Postgres Example Subscribers Dataset", + collections=[ + { + "name": "subscriptions", + "fields": [ + { + "name": "email", + "data_categories": ["user.contact.email"], + "fidesops_meta": { + "identity": "email", + }, + }, + ], + }, + ], + ) + ctl_dataset = CtlDataset(**ds.dict()) + + db.add(ctl_dataset) + db.commit() + dataset_config = DatasetConfig.create( + db=db, + data={ + "connection_config_id": connection_config.id, + "fides_key": fides_key, + "ctl_dataset_id": ctl_dataset.id, + }, + ) + return ctl_dataset, dataset_config + + @pytest.fixture(scope="function") + def connection_config_no_system(self, db): + """Connection config used for data_use_map testing, not associated with a system""" + connection_config = ConnectionConfig.create( + db=db, + data={ + "name": str(uuid4()), + "key": "connection_config_data_use_map_no_system", + "connection_type": ConnectionType.manual, + "access": AccessLevel.write, + "disabled": False, + }, + ) + + ctl_dataset, dataset_config = self.create_dataset( + db, "postgres_example_subscriptions_dataset_no_system", connection_config + ) + + yield connection_config + dataset_config.delete(db) + ctl_dataset.delete(db) + connection_config.delete(db) + + @pytest.fixture(scope="function") + def connection_config_system(self, db, system): + """Connection config used for data_use_map testing, associated with a system""" + connection_config = ConnectionConfig.create( + db=db, + data={ + "name": str(uuid4()), + "key": "connection_config_data_use_map", + "connection_type": ConnectionType.manual, + "access": AccessLevel.write, + "disabled": False, + "system_id": system.id, + }, + ) + + ctl_dataset, dataset_config = self.create_dataset( + db, "postgres_example_subscriptions_dataset", connection_config + ) + + yield connection_config + dataset_config.delete(db) + ctl_dataset.delete(db) + connection_config.delete(db) + + @pytest.fixture(scope="function") + def connection_config_system_multiple_decs(self, db, system_multiple_decs): + """ + Connection config used for data_use_map testing, associated with a system + that has multiple privacy declarations and data uses + """ + connection_config = ConnectionConfig.create( + db=db, + data={ + "name": str(uuid4()), + "key": "connection_config_data_use_map_system_multiple_decs", + "connection_type": ConnectionType.manual, + "access": AccessLevel.write, + "disabled": False, + "system_id": system_multiple_decs.id, + }, + ) + + ctl_dataset, dataset_config = self.create_dataset( + db, + "postgres_example_subscriptions_dataset_multiple_decs", + connection_config, + ) + + yield connection_config + dataset_config.delete(db) + ctl_dataset.delete(db) + connection_config.delete(db) + + @pytest.mark.parametrize( + "connection_config_fixtures,expected_data_use_map", + [ + ( + [ + "connection_config_no_system" + ], # connection config no system, no data uses + {"postgres_example_subscriptions_dataset_no_system:subscriptions": {}}, + ), + ( + [ + "connection_config_system" + ], # connection config associated with system and therefore data uses + { + "postgres_example_subscriptions_dataset:subscriptions": { + "marketing.advertising" + }, + }, + ), + ( + [ + "connection_config_system_multiple_decs" + ], # system has multiple declarations, multiple data uses + { + "postgres_example_subscriptions_dataset_multiple_decs:subscriptions": { + "marketing.advertising", + "third_party_sharing", + }, + }, + ), + ( + [ # ensure map is populated correctly with multiple systems + "connection_config_no_system", + "connection_config_system_multiple_decs", + ], + { + "postgres_example_subscriptions_dataset_no_system:subscriptions": {}, + "postgres_example_subscriptions_dataset_multiple_decs:subscriptions": { + "marketing.advertising", + "third_party_sharing", + }, + }, + ), + ], + ) + def test_data_use_map( + self, + connection_config_fixtures, + expected_data_use_map, + db, + privacy_request, + policy, + request, + ): + """ + Unit tests that confirm the output from function used to generate + the `Collection` -> `DataUse` map that's cached during access request execution. + """ + + # load connection config fixtures + connection_configs = [] + for config_fixture in connection_config_fixtures: + connection_configs.append(request.getfixturevalue(config_fixture)) + + # create a sample traversal with our current dataset state + datasets = DatasetConfig.all(db=db) + dataset_graphs = [dataset_config.get_graph() for dataset_config in datasets] + dataset_graph = DatasetGraph(*dataset_graphs) + traversal: Traversal = Traversal( + dataset_graph, {"email": {"test_user@example.com"}} + ) + env: Dict[CollectionAddress, Any] = {} + task_resources = TaskResources(privacy_request, policy, connection_configs, db) + + # perform the traversal to populate our `env` dict + def collect_tasks_fn( + tn: TraversalNode, data: Dict[CollectionAddress, GraphTask] + ) -> None: + """Run the traversal, as an action creating a GraphTask for each traversal_node.""" + if not tn.is_root_node(): + data[tn.address] = GraphTask(tn, task_resources) + + traversal.traverse( + env, + collect_tasks_fn, + ) + + # ensure that the generated data_use_map looks as expected based on `env` dict + assert _format_data_use_map_for_caching(env) == expected_data_use_map + + class TestGraphTaskAffectedConsentSystems: @pytest.fixture() def mock_graph_task(