diff --git a/src/fides/api/alembic/migrations/versions/9de4bb76307a_add_dynamic_erasure_email_connector_type.py b/src/fides/api/alembic/migrations/versions/9de4bb76307a_add_dynamic_erasure_email_connector_type.py index 98c43d09be..8facec96af 100644 --- a/src/fides/api/alembic/migrations/versions/9de4bb76307a_add_dynamic_erasure_email_connector_type.py +++ b/src/fides/api/alembic/migrations/versions/9de4bb76307a_add_dynamic_erasure_email_connector_type.py @@ -1,8 +1,8 @@ """Add dynamic erasure email connector type Revision ID: 9de4bb76307a -Revises: 9cad5a5c438c -Create Date: 2024-08-16 18:36:35.020140 +Revises: eef4477c37d0 +Create Date: 2024-09-04 11:36:35.020140 """ @@ -11,7 +11,7 @@ # revision identifiers, used by Alembic. revision = "9de4bb76307a" -down_revision = "9cad5a5c438c" +down_revision = "eef4477c37d0" branch_labels = None depends_on = None diff --git a/src/fides/api/graph/config.py b/src/fides/api/graph/config.py index 461ace704b..409aed783c 100644 --- a/src/fides/api/graph/config.py +++ b/src/fides/api/graph/config.py @@ -500,7 +500,19 @@ def identities(self) -> Dict[FieldPath, str]: def custom_request_fields(self) -> Dict[FieldPath, str]: """ Return custom request fields included in the table, - i.e fields whose values may come in a custom request field on a DSR + i.e fields whose values may come in a custom request field on a DSR. + + E.g if the collection is defined like: + - name: publishers + - fields: + - name: id + fides_meta: + identity: true + - name: site_id + fides_meta: + custom_request_field: tenant_id + + Then this returns a dictionary of the form {FieldPath("site_id"): "tenant_id"} """ return { field_path: field.custom_request_field diff --git a/src/fides/api/schemas/connection_configuration/connection_secrets_dynamic_erasure_email.py b/src/fides/api/schemas/connection_configuration/connection_secrets_dynamic_erasure_email.py index 80d19f454c..e9e4e8cd7b 100644 --- a/src/fides/api/schemas/connection_configuration/connection_secrets_dynamic_erasure_email.py +++ b/src/fides/api/schemas/connection_configuration/connection_secrets_dynamic_erasure_email.py @@ -34,39 +34,39 @@ class DynamicErasureEmailSchema(BaseEmailSchema): }, # metadata added so we can identify these secret schema fields as external references ) - @staticmethod - def validate_dataset_references( - db: Session, - connection_secrets: "DynamicErasureEmailSchema", - ) -> Any: - """ - Validates that the provided connection secrets reference an existing datasetconfig collection, - and that both third_party_vendor_name and recipient_email_address reference the same collection - from the same dataset. If any of these conditions are not met, a ValidationError is raised. - """ - validate_dataset_reference(db, connection_secrets.recipient_email_address) - validate_dataset_reference(db, connection_secrets.third_party_vendor_name) - if ( - connection_secrets.recipient_email_address.dataset - != connection_secrets.third_party_vendor_name.dataset - ): - raise ValidationError( - "Recipient email address and third party vendor name must reference the same dataset" - ) +class DynamicErasureEmailDocsSchema(DynamicErasureEmailSchema, NoValidationSchema): + """DynamicErasureEmailDocsSchema Secrets Schema for API Docs""" - email_collection_name = connection_secrets.recipient_email_address.field.split( - "." - )[0] - vendor_collection_name = connection_secrets.third_party_vendor_name.field.split( - "." - )[0] - if email_collection_name != vendor_collection_name: - raise ValidationError( - "Recipient email address and third party vendor name must reference the same collection" - ) +def validate_dynamic_erasure_email_dataset_references( + db: Session, + connection_secrets: "DynamicErasureEmailSchema", +) -> Any: + """ + Validates that the provided connection secrets reference an existing datasetconfig collection, + and that both third_party_vendor_name and recipient_email_address reference the same collection + from the same dataset. If any of these conditions are not met, a ValidationError is raised. + """ + validate_dataset_reference(db, connection_secrets.recipient_email_address) + validate_dataset_reference(db, connection_secrets.third_party_vendor_name) + if ( + connection_secrets.recipient_email_address.dataset + != connection_secrets.third_party_vendor_name.dataset + ): + raise ValidationError( + "Recipient email address and third party vendor name must reference the same dataset" + ) -class DynamicErasureEmailDocsSchema(DynamicErasureEmailSchema, NoValidationSchema): - """DynamicErasureEmailDocsSchema Secrets Schema for API Docs""" + email_collection_name = connection_secrets.recipient_email_address.field.split(".")[ + 0 + ] + vendor_collection_name = connection_secrets.third_party_vendor_name.field.split( + "." + )[0] + + if email_collection_name != vendor_collection_name: + raise ValidationError( + "Recipient email address and third party vendor name must reference the same collection" + ) diff --git a/src/fides/api/service/connectors/dynamic_erasure_email_connector.py b/src/fides/api/service/connectors/dynamic_erasure_email_connector.py index ea24497600..fda149e689 100644 --- a/src/fides/api/service/connectors/dynamic_erasure_email_connector.py +++ b/src/fides/api/service/connectors/dynamic_erasure_email_connector.py @@ -3,6 +3,7 @@ from fideslang.models import FidesDatasetReference from loguru import logger +from sqlalchemy.exc import InternalError, OperationalError from sqlalchemy.orm import Query, Session from fides.api.common_exceptions import MessageDispatchException @@ -38,6 +39,11 @@ class DynamicErasureEmailConnectorException(Exception): class ProcessedConfig(NamedTuple): + """ + Helper type to make the code easier to read, returned by process_connector_config + and used by get_email_and_vendor_from_custom_request_fields. + """ + graph_dataset: GraphDataset connector: BaseConnector collection_address: CollectionAddress @@ -63,11 +69,11 @@ def get_config(self, configuration: ConnectionConfig) -> DynamicErasureEmailSche def batch_email_send(self, privacy_requests: Query) -> None: logger.debug( - "Starting batch_email_send for connector: {} ...", self.configuration.name + "Starting batch_email_send for connector: {} ...", self.configuration.key ) db: Session = Session.object_session(self.configuration) - proccessed_config = self.process_connector_config(db, privacy_requests) + processed_config = self.process_connector_config(db, privacy_requests) skipped_privacy_requests: List[str] = [] @@ -80,7 +86,7 @@ def batch_email_send(self, privacy_requests: Query) -> None: custom_field_data = ( self.get_email_and_vendor_from_custom_request_fields( db, - proccessed_config, + processed_config, privacy_request, ) ) @@ -118,7 +124,7 @@ def batch_email_send(self, privacy_requests: Query) -> None: except Exception as exc: logger.error( - "An error occurred when retrieving email from custom request fields for connector {}. Skipping email send for privacy request with id {}. Error: {}", + "An error occurred when retrieving email from custom request fields for connector {}. Erasure email for privacy request with id {} not sent. Error: {}", self.configuration.key, privacy_request.id, exc, @@ -149,8 +155,17 @@ def batch_email_send(self, privacy_requests: Query) -> None: test_mode=False, ) except MessageDispatchException as exc: - logger.info("Erasure email failed with exception {}", exc) - raise + logger.error( + "Dynamic erasure email for connector {} failed with exception {}", + self.configuration.key, + exc, + ) + self.error_all_privacy_requests( + db, + privacy_requests, + f"Dynamic erasure email for connector failed with MessageDispatchException. Error: {exc}", + ) + raise exc # create an audit event for each privacy request ID for privacy_request in privacy_requests: @@ -185,7 +200,7 @@ def get_collection_and_field_from_reference( if len(split_field) < 2: field = dataset_reference.field # pylint: disable=no-member - error_log = f"Invalid dataset reference field {field} for dataset {dataset_key}. Skipping erasure email send for connector: {self.configuration.key}." + error_log = f"Invalid dataset reference field {field} for dataset {dataset_key}. Failed to send dynamic erasure emails for connector: {self.configuration.key}." logger.error(error_log) self.error_all_privacy_requests( db, @@ -213,7 +228,7 @@ def process_connector_config( """ # Import here to avoid circular import error # TODO: once we incorporate custom request fields into the DSR graph logic we won't need this import here, - # so ignorning cyclic import warning for now + # so ignoring cyclic import warning for now from fides.api.service.connectors import ( # pylint: disable=cyclic-import get_connector, ) @@ -222,12 +237,12 @@ def process_connector_config( email_dataset_key = email_dataset_reference.dataset # pylint: disable=no-member vendor_dataset_reference = self.config.third_party_vendor_name - vendor_datset_key = ( + vendor_dataset_key = ( vendor_dataset_reference.dataset # pylint: disable=no-member ) - if email_dataset_key != vendor_datset_key: - error_log = f"Dynamic Erasure Email Connector with key {self.configuration.key} references different datasets for email and vendor fields. Skipping erasure email send." + if email_dataset_key != vendor_dataset_key: + error_log = f"Dynamic Erasure Email Connector with key {self.configuration.key} references different datasets for email and vendor fields. Erasure emails not sent." logger.error(error_log) self.error_all_privacy_requests( db, @@ -249,7 +264,7 @@ def process_connector_config( ) if collection_name != vendor_collection_name: - error_log = f"Dynamic Erasure Email Connector with key {self.configuration.key} references different collections for email and vendor fields. Skipping erasure email send." + error_log = f"Dynamic Erasure Email Connector with key {self.configuration.key} references different collections for email and vendor fields. Erasure emails not sent." logger.error(error_log) self.error_all_privacy_requests( db, @@ -264,12 +279,12 @@ def process_connector_config( ) if not dataset_config: - error_log = f"DatasetConfig with key {email_dataset_key} not found. Skipping erasure email send for connector: {self.configuration.key}." + error_log = f"DatasetConfig with key {email_dataset_key} not found. Failed to send dynamic erasure emails for connector: {self.configuration.key}." logger.error(error_log) self.error_all_privacy_requests( db, privacy_requests, - f"Connector configuration references DatasetConfig with key {email_dataset_key}, but such DatasetConfig was not found.", + f"Connector configuration references DatasetConfig with key {email_dataset_key}, but no such DatasetConfig was found.", ) raise DynamicErasureEmailConnectorException(error_log) @@ -278,19 +293,15 @@ def process_connector_config( graph_dataset: GraphDataset = dataset_config.get_graph() # Get the corresponding ConnectionConfig instance - connection_config: ConnectionConfig = ( - db.execute( - db.query(ConnectionConfig).filter( - ConnectionConfig.id == dataset_config.connection_config_id - ) - ) - .scalars() + connection_config = ( + db.query(ConnectionConfig) + .filter(ConnectionConfig.id == dataset_config.connection_config_id) .first() ) - # Instatiate the ConnectionConfig's connector so that we can execute the query + # Instantiate the ConnectionConfig's connector so that we can execute the query # to retrieve the email addresses based on the custom request fields - connector = get_connector(connection_config) + connector = get_connector(connection_config) # type: ignore # Search for the collection in the dataset that contains the custom request fields collections = [ @@ -298,7 +309,7 @@ def process_connector_config( ] if not collections: - error_log = f"Collection with name {collection_name} not found in dataset {graph_dataset.name}. Skipping erasure email send for connector: {self.configuration.key}." + error_log = f"Collection with name {collection_name} not found in dataset {graph_dataset.name}. Failed to send dynamic erasure emails for connector: {self.configuration.key}." logger.error(error_log) self.error_all_privacy_requests( db, @@ -400,19 +411,41 @@ def get_email_and_vendor_from_custom_request_fields( for dsr_field, dsr_field_data in custom_request_fields.items() } - # We execute a standalone retrieval query on the connector to get the email address - # based on the custom request fields on the privacy request and the field_name - # provided as part of the connection config. - # Following the previous example, this query would be something like - # SELECT email_address FROM site_info WHERE site_id = '1234' - retrieved_data = processed_config.connector.execute_standalone_retrieval_query( - node=execution_node, - fields=[processed_config.email_field, processed_config.vendor_field], - filters=query_filters, - ) + try: + # We execute a standalone retrieval query on the connector to get the email address + # based on the custom request fields on the privacy request and the field_name + # provided as part of the connection config. + # Following the previous example, this query would be something like + # SELECT email_address FROM site_info WHERE site_id = '1234' + retrieved_data = ( + processed_config.connector.execute_standalone_retrieval_query( + node=execution_node, + fields=[ + processed_config.email_field, + processed_config.vendor_field, + ], + filters=query_filters, + ) + ) + except OperationalError: + error_log = f"An OperationalError occurred when retrieving email from custom request fields for connector {self.configuration.key}. Erasure email for privacy request with id {privacy_request.id} not sent." + logger.error(error_log) + self.error_privacy_request(db, privacy_request, error_log) + return None + except InternalError: + error_log = f"An InternalError occurred when retrieving email from custom request fields for connector {self.configuration.key}. Erasure email for privacy request with id {privacy_request.id} not sent." + logger.error(error_log) + self.error_privacy_request(db, privacy_request, error_log) + return None + except Exception: + error_log = f"An error occurred when retrieving email from custom request fields for connector {self.configuration.key}. Erasure email for privacy request with id {privacy_request.id} not sent." + logger.error(error_log) + self.error_privacy_request(db, privacy_request, error_log) + return None + if not retrieved_data: logger.error( - "Custom request field lookup yielded no results. Skipping email send for privacy request with id {}.", + "Custom request field lookup yielded no results. Erasure email for privacy request with id {} not sent.", privacy_request.id, ) self.error_privacy_request( @@ -424,7 +457,7 @@ def get_email_and_vendor_from_custom_request_fields( if len(retrieved_data) > 1: logger.error( - "Custom request field lookup yielded multiple results. Skipping email send for privacy request with id {}.", + "Custom request field lookup yielded multiple results. Erasure email for privacy request with id {} not sent.", privacy_request.id, ) self.error_privacy_request( @@ -486,13 +519,13 @@ def test_connection(self) -> Optional[ConnectionTestStatus]: try: if not self.config.test_email_address: raise MessageDispatchException( - f"Cannot test connection. No test email defined for {self.configuration.name}" + f"Cannot test connection. No test email defined for {self.configuration.key}" ) # synchronous for now since failure to send is considered a connection test failure send_single_erasure_email( db=db, subject_email=self.config.test_email_address, - subject_name="Test Vendor", # Needs to be a string since vendor depends con custom request field values + subject_name="Test Vendor", # Needs to be a string since vendor depends on custom request field values batch_identities=list(self.identities_for_test_email.values()), test_mode=True, ) diff --git a/src/fides/api/service/connectors/query_config.py b/src/fides/api/service/connectors/query_config.py index 7047e4f27b..6664e138bb 100644 --- a/src/fides/api/service/connectors/query_config.py +++ b/src/fides/api/service/connectors/query_config.py @@ -373,6 +373,53 @@ def format_clause_for_query( ) -> str: """Returns clause formatted in the specific SQL dialect for the query""" + def generate_raw_query_without_tuples( + self, field_list: List[str], filters: Dict[str, List[Any]] + ) -> Optional[T]: + """ + Generate a raw query using the provided field_list and filters, + i.e a query of the form: + SELECT FROM WHERE + + Generates distinct key/val pairs for building the query string instead of a tuple. + + E.g. SQLQueryConfig uses 1 key as a tuple: + SELECT order_id,product_id,quantity FROM order_item WHERE order_id IN (:some-params-in-tuple) + which for some connectors gets interpreted as data types (mssql) or quotes (bigquery). + + This method produces distinct keys for the query_str: + SELECT order_id,product_id,quantity FROM order_item WHERE order_id IN (:_in_stmt_generated_0, :_in_stmt_generated_1, :_in_stmt_generated_2) + + """ + clauses = [] + query_data: Dict[str, Tuple[Any, ...]] = {} + for field_name, field_value in filters.items(): + data = set(field_value) + if len(data) == 1: + clauses.append( + self.format_clause_for_query(field_name, "=", field_name) + ) + query_data[field_name] = data.pop() + elif len(data) > 1: + data_vals = list(data) + query_data_keys: List[str] = [] + for val in data_vals: + # appending "_in_stmt_generated_" (can be any arbitrary str) so that this name has less change of conflicting with pre-existing column in table + query_data_name = ( + field_name + "_in_stmt_generated_" + str(data_vals.index(val)) + ) + query_data[query_data_name] = val + query_data_keys.append(self.format_query_data_name(query_data_name)) + operand = ", ".join(query_data_keys) + clauses.append(self.format_clause_for_query(field_name, "IN", operand)) + + if len(clauses) > 0: + formatted_fields = ", ".join(field_list) + query_str = self.get_formatted_query_string(formatted_fields, clauses) + return self.format_query_stmt(query_str, query_data) + + return None + def generate_query_without_tuples( # pylint: disable=R0914 self, input_data: Dict[str, List[Any]], @@ -391,41 +438,13 @@ def generate_query_without_tuples( # pylint: disable=R0914 filtered_data = self.node.typed_filtered_values(input_data) if filtered_data: - clauses = [] - query_data: Dict[str, Tuple[Any, ...]] = {} formatted_fields = self.format_fields_for_query( list(self.field_map().keys()) ) - field_list = ",".join(formatted_fields) - for string_path, data in filtered_data.items(): - data = set(data) - if len(data) == 1: - clauses.append( - self.format_clause_for_query(string_path, "=", string_path) - ) - query_data[string_path] = data.pop() - elif len(data) > 1: - data_vals = list(data) - query_data_keys: List[str] = [] - for val in data_vals: - # appending "_in_stmt_generated_" (can be any arbitrary str) so that this name has less change of conflicting with pre-existing column in table - query_data_name = ( - string_path - + "_in_stmt_generated_" - + str(data_vals.index(val)) - ) - query_data[query_data_name] = val - query_data_keys.append( - self.format_query_data_name(query_data_name) - ) - operand = ", ".join(query_data_keys) - clauses.append( - self.format_clause_for_query(string_path, "IN", operand) - ) - if len(clauses) > 0: - query_str = self.get_formatted_query_string(field_list, clauses) - return self.format_query_stmt(query_str, query_data) + return self.generate_raw_query_without_tuples( + formatted_fields, filtered_data + ) logger.warning( "There is not enough data to generate a valid query for {}", diff --git a/src/fides/api/util/connection_util.py b/src/fides/api/util/connection_util.py index 69098d3986..caf4b986f7 100644 --- a/src/fides/api/util/connection_util.py +++ b/src/fides/api/util/connection_util.py @@ -44,7 +44,7 @@ TestStatusMessage, ) from fides.api.schemas.connection_configuration.connection_secrets_dynamic_erasure_email import ( - DynamicErasureEmailSchema, + validate_dynamic_erasure_email_dataset_references, ) from fides.api.schemas.connection_configuration.connection_secrets_saas import ( validate_saas_secrets_external_references, @@ -144,9 +144,7 @@ def validate_secrets( # For dynamic erasure emails we must validate the recipient email address if connection_type == ConnectionType.dynamic_erasure_email: try: - DynamicErasureEmailSchema.validate_dataset_references( - db, connection_secrets - ) + validate_dynamic_erasure_email_dataset_references(db, connection_secrets) except FidesValidationError as e: raise HTTPException( status_code=HTTP_422_UNPROCESSABLE_ENTITY, detail=e.message diff --git a/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py b/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py index 46c17a2342..8ab908ce4d 100644 --- a/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py +++ b/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py @@ -1681,7 +1681,7 @@ def test_put_dynamic_erasure_email_connection_config_invalid_secrets( is None ) - def test_put_dynamic_erasure_email_connection_config_mismtached_datasets( + def test_put_dynamic_erasure_email_connection_config_mismatched_datasets( self, url, api_client: TestClient, @@ -1735,7 +1735,7 @@ def test_put_dynamic_erasure_email_connection_config_mismtached_datasets( is None ) - def test_put_dynamic_erasure_email_connection_config_mismtached_collections( + def test_put_dynamic_erasure_email_connection_config_mismatched_collections( self, url, api_client: TestClient, diff --git a/tests/ops/service/connectors/test_queryconfig.py b/tests/ops/service/connectors/test_queryconfig.py index e1d8d8f29a..6cd1031804 100644 --- a/tests/ops/service/connectors/test_queryconfig.py +++ b/tests/ops/service/connectors/test_queryconfig.py @@ -759,7 +759,7 @@ def test_dry_run_query_with_data(self, complete_execution_node): dry_run_query = query_config.dry_run_query() assert ( dry_run_query - == "SELECT age,alternative_contacts,ascii_data,big_int_data,do_not_contact,double_data,duration,email,float_data,last_contacted,logins,name,states_lived,timestamp,user_id,uuid FROM users WHERE email = ? ALLOW FILTERING;" + == "SELECT age, alternative_contacts, ascii_data, big_int_data, do_not_contact, double_data, duration, email, float_data, last_contacted, logins, name, states_lived, timestamp, user_id, uuid FROM users WHERE email = ? ALLOW FILTERING;" ) def test_query_to_str(self, complete_execution_node):