Skip to content

Commit

Permalink
Code review improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
erosselli committed Sep 4, 2024
1 parent 713ec6a commit 61dc048
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Add dynamic erasure email connector type
Revision ID: 9de4bb76307a
Revises: 9cad5a5c438c
Create Date: 2024-08-16 18:36:35.020140
Revises: eef4477c37d0
Create Date: 2024-09-04 11:36:35.020140
"""

Expand All @@ -11,7 +11,7 @@

# revision identifiers, used by Alembic.
revision = "9de4bb76307a"
down_revision = "9cad5a5c438c"
down_revision = "eef4477c37d0"
branch_labels = None
depends_on = None

Expand Down
14 changes: 13 additions & 1 deletion src/fides/api/graph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,19 @@ def identities(self) -> Dict[FieldPath, str]:
def custom_request_fields(self) -> Dict[FieldPath, str]:
"""
Return custom request fields included in the table,
i.e fields whose values may come in a custom request field on a DSR
i.e fields whose values may come in a custom request field on a DSR.
E.g if the collection is defined like:
- name: publishers
- fields:
- name: id
fides_meta:
identity: true
- name: site_id
fides_meta:
custom_request_field: tenant_id
Then this returns a dictionary of the form {FieldPath("site_id"): "tenant_id"}
"""
return {
field_path: field.custom_request_field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,39 @@ class DynamicErasureEmailSchema(BaseEmailSchema):
}, # metadata added so we can identify these secret schema fields as external references
)

@staticmethod
def validate_dataset_references(
db: Session,
connection_secrets: "DynamicErasureEmailSchema",
) -> Any:
"""
Validates that the provided connection secrets reference an existing datasetconfig collection,
and that both third_party_vendor_name and recipient_email_address reference the same collection
from the same dataset. If any of these conditions are not met, a ValidationError is raised.
"""
validate_dataset_reference(db, connection_secrets.recipient_email_address)
validate_dataset_reference(db, connection_secrets.third_party_vendor_name)

if (
connection_secrets.recipient_email_address.dataset
!= connection_secrets.third_party_vendor_name.dataset
):
raise ValidationError(
"Recipient email address and third party vendor name must reference the same dataset"
)
class DynamicErasureEmailDocsSchema(DynamicErasureEmailSchema, NoValidationSchema):
"""DynamicErasureEmailDocsSchema Secrets Schema for API Docs"""

email_collection_name = connection_secrets.recipient_email_address.field.split(
"."
)[0]
vendor_collection_name = connection_secrets.third_party_vendor_name.field.split(
"."
)[0]

if email_collection_name != vendor_collection_name:
raise ValidationError(
"Recipient email address and third party vendor name must reference the same collection"
)
def validate_dynamic_erasure_email_dataset_references(
db: Session,
connection_secrets: "DynamicErasureEmailSchema",
) -> Any:
"""
Validates that the provided connection secrets reference an existing datasetconfig collection,
and that both third_party_vendor_name and recipient_email_address reference the same collection
from the same dataset. If any of these conditions are not met, a ValidationError is raised.
"""
validate_dataset_reference(db, connection_secrets.recipient_email_address)
validate_dataset_reference(db, connection_secrets.third_party_vendor_name)

if (
connection_secrets.recipient_email_address.dataset
!= connection_secrets.third_party_vendor_name.dataset
):
raise ValidationError(
"Recipient email address and third party vendor name must reference the same dataset"
)

class DynamicErasureEmailDocsSchema(DynamicErasureEmailSchema, NoValidationSchema):
"""DynamicErasureEmailDocsSchema Secrets Schema for API Docs"""
email_collection_name = connection_secrets.recipient_email_address.field.split(".")[
0
]
vendor_collection_name = connection_secrets.third_party_vendor_name.field.split(
"."
)[0]

if email_collection_name != vendor_collection_name:
raise ValidationError(
"Recipient email address and third party vendor name must reference the same collection"
)
109 changes: 71 additions & 38 deletions src/fides/api/service/connectors/dynamic_erasure_email_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from fideslang.models import FidesDatasetReference
from loguru import logger
from sqlalchemy.exc import InternalError, OperationalError
from sqlalchemy.orm import Query, Session

from fides.api.common_exceptions import MessageDispatchException
Expand Down Expand Up @@ -38,6 +39,11 @@ class DynamicErasureEmailConnectorException(Exception):


class ProcessedConfig(NamedTuple):
"""
Helper type to make the code easier to read, returned by process_connector_config
and used by get_email_and_vendor_from_custom_request_fields.
"""

graph_dataset: GraphDataset
connector: BaseConnector
collection_address: CollectionAddress
Expand All @@ -63,11 +69,11 @@ def get_config(self, configuration: ConnectionConfig) -> DynamicErasureEmailSche

def batch_email_send(self, privacy_requests: Query) -> None:
logger.debug(
"Starting batch_email_send for connector: {} ...", self.configuration.name
"Starting batch_email_send for connector: {} ...", self.configuration.key
)

db: Session = Session.object_session(self.configuration)
proccessed_config = self.process_connector_config(db, privacy_requests)
processed_config = self.process_connector_config(db, privacy_requests)

skipped_privacy_requests: List[str] = []

Expand All @@ -80,7 +86,7 @@ def batch_email_send(self, privacy_requests: Query) -> None:
custom_field_data = (
self.get_email_and_vendor_from_custom_request_fields(
db,
proccessed_config,
processed_config,
privacy_request,
)
)
Expand Down Expand Up @@ -118,7 +124,7 @@ def batch_email_send(self, privacy_requests: Query) -> None:

except Exception as exc:
logger.error(
"An error occurred when retrieving email from custom request fields for connector {}. Skipping email send for privacy request with id {}. Error: {}",
"An error occurred when retrieving email from custom request fields for connector {}. Erasure email for privacy request with id {} not sent. Error: {}",
self.configuration.key,
privacy_request.id,
exc,
Expand Down Expand Up @@ -149,8 +155,17 @@ def batch_email_send(self, privacy_requests: Query) -> None:
test_mode=False,
)
except MessageDispatchException as exc:
logger.info("Erasure email failed with exception {}", exc)
raise
logger.error(
"Dynamic erasure email for connector {} failed with exception {}",
self.configuration.key,
exc,
)
self.error_all_privacy_requests(
db,
privacy_requests,
f"Dynamic erasure email for connector failed with MessageDispatchException. Error: {exc}",
)
raise exc

# create an audit event for each privacy request ID
for privacy_request in privacy_requests:
Expand Down Expand Up @@ -185,7 +200,7 @@ def get_collection_and_field_from_reference(

if len(split_field) < 2:
field = dataset_reference.field # pylint: disable=no-member
error_log = f"Invalid dataset reference field {field} for dataset {dataset_key}. Skipping erasure email send for connector: {self.configuration.key}."
error_log = f"Invalid dataset reference field {field} for dataset {dataset_key}. Failed to send dynamic erasure emails for connector: {self.configuration.key}."
logger.error(error_log)
self.error_all_privacy_requests(
db,
Expand Down Expand Up @@ -213,7 +228,7 @@ def process_connector_config(
"""
# Import here to avoid circular import error
# TODO: once we incorporate custom request fields into the DSR graph logic we won't need this import here,
# so ignorning cyclic import warning for now
# so ignoring cyclic import warning for now
from fides.api.service.connectors import ( # pylint: disable=cyclic-import
get_connector,
)
Expand All @@ -222,12 +237,12 @@ def process_connector_config(
email_dataset_key = email_dataset_reference.dataset # pylint: disable=no-member

vendor_dataset_reference = self.config.third_party_vendor_name
vendor_datset_key = (
vendor_dataset_key = (
vendor_dataset_reference.dataset # pylint: disable=no-member
)

if email_dataset_key != vendor_datset_key:
error_log = f"Dynamic Erasure Email Connector with key {self.configuration.key} references different datasets for email and vendor fields. Skipping erasure email send."
if email_dataset_key != vendor_dataset_key:
error_log = f"Dynamic Erasure Email Connector with key {self.configuration.key} references different datasets for email and vendor fields. Erasure emails not sent."
logger.error(error_log)
self.error_all_privacy_requests(
db,
Expand All @@ -249,7 +264,7 @@ def process_connector_config(
)

if collection_name != vendor_collection_name:
error_log = f"Dynamic Erasure Email Connector with key {self.configuration.key} references different collections for email and vendor fields. Skipping erasure email send."
error_log = f"Dynamic Erasure Email Connector with key {self.configuration.key} references different collections for email and vendor fields. Erasure emails not sent."
logger.error(error_log)
self.error_all_privacy_requests(
db,
Expand All @@ -264,12 +279,12 @@ def process_connector_config(
)

if not dataset_config:
error_log = f"DatasetConfig with key {email_dataset_key} not found. Skipping erasure email send for connector: {self.configuration.key}."
error_log = f"DatasetConfig with key {email_dataset_key} not found. Failed to send dynamic erasure emails for connector: {self.configuration.key}."
logger.error(error_log)
self.error_all_privacy_requests(
db,
privacy_requests,
f"Connector configuration references DatasetConfig with key {email_dataset_key}, but such DatasetConfig was not found.",
f"Connector configuration references DatasetConfig with key {email_dataset_key}, but no such DatasetConfig was found.",
)

raise DynamicErasureEmailConnectorException(error_log)
Expand All @@ -278,27 +293,23 @@ def process_connector_config(
graph_dataset: GraphDataset = dataset_config.get_graph()

# Get the corresponding ConnectionConfig instance
connection_config: ConnectionConfig = (
db.execute(
db.query(ConnectionConfig).filter(
ConnectionConfig.id == dataset_config.connection_config_id
)
)
.scalars()
connection_config = (
db.query(ConnectionConfig)
.filter(ConnectionConfig.id == dataset_config.connection_config_id)
.first()
)

# Instatiate the ConnectionConfig's connector so that we can execute the query
# Instantiate the ConnectionConfig's connector so that we can execute the query
# to retrieve the email addresses based on the custom request fields
connector = get_connector(connection_config)
connector = get_connector(connection_config) # type: ignore

# Search for the collection in the dataset that contains the custom request fields
collections = [
c for c in graph_dataset.collections if c.name == collection_name
]

if not collections:
error_log = f"Collection with name {collection_name} not found in dataset {graph_dataset.name}. Skipping erasure email send for connector: {self.configuration.key}."
error_log = f"Collection with name {collection_name} not found in dataset {graph_dataset.name}. Failed to send dynamic erasure emails for connector: {self.configuration.key}."
logger.error(error_log)
self.error_all_privacy_requests(
db,
Expand Down Expand Up @@ -400,19 +411,41 @@ def get_email_and_vendor_from_custom_request_fields(
for dsr_field, dsr_field_data in custom_request_fields.items()
}

# We execute a standalone retrieval query on the connector to get the email address
# based on the custom request fields on the privacy request and the field_name
# provided as part of the connection config.
# Following the previous example, this query would be something like
# SELECT email_address FROM site_info WHERE site_id = '1234'
retrieved_data = processed_config.connector.execute_standalone_retrieval_query(
node=execution_node,
fields=[processed_config.email_field, processed_config.vendor_field],
filters=query_filters,
)
try:
# We execute a standalone retrieval query on the connector to get the email address
# based on the custom request fields on the privacy request and the field_name
# provided as part of the connection config.
# Following the previous example, this query would be something like
# SELECT email_address FROM site_info WHERE site_id = '1234'
retrieved_data = (
processed_config.connector.execute_standalone_retrieval_query(
node=execution_node,
fields=[
processed_config.email_field,
processed_config.vendor_field,
],
filters=query_filters,
)
)
except OperationalError:
error_log = f"An OperationalError occurred when retrieving email from custom request fields for connector {self.configuration.key}. Erasure email for privacy request with id {privacy_request.id} not sent."
logger.error(error_log)
self.error_privacy_request(db, privacy_request, error_log)
return None
except InternalError:
error_log = f"An InternalError occurred when retrieving email from custom request fields for connector {self.configuration.key}. Erasure email for privacy request with id {privacy_request.id} not sent."
logger.error(error_log)
self.error_privacy_request(db, privacy_request, error_log)
return None
except Exception:
error_log = f"An error occurred when retrieving email from custom request fields for connector {self.configuration.key}. Erasure email for privacy request with id {privacy_request.id} not sent."
logger.error(error_log)
self.error_privacy_request(db, privacy_request, error_log)
return None

if not retrieved_data:
logger.error(
"Custom request field lookup yielded no results. Skipping email send for privacy request with id {}.",
"Custom request field lookup yielded no results. Erasure email for privacy request with id {} not sent.",
privacy_request.id,
)
self.error_privacy_request(
Expand All @@ -424,7 +457,7 @@ def get_email_and_vendor_from_custom_request_fields(

if len(retrieved_data) > 1:
logger.error(
"Custom request field lookup yielded multiple results. Skipping email send for privacy request with id {}.",
"Custom request field lookup yielded multiple results. Erasure email for privacy request with id {} not sent.",
privacy_request.id,
)
self.error_privacy_request(
Expand Down Expand Up @@ -486,13 +519,13 @@ def test_connection(self) -> Optional[ConnectionTestStatus]:
try:
if not self.config.test_email_address:
raise MessageDispatchException(
f"Cannot test connection. No test email defined for {self.configuration.name}"
f"Cannot test connection. No test email defined for {self.configuration.key}"
)
# synchronous for now since failure to send is considered a connection test failure
send_single_erasure_email(
db=db,
subject_email=self.config.test_email_address,
subject_name="Test Vendor", # Needs to be a string since vendor depends con custom request field values
subject_name="Test Vendor", # Needs to be a string since vendor depends on custom request field values
batch_identities=list(self.identities_for_test_email.values()),
test_mode=True,
)
Expand Down
Loading

0 comments on commit 61dc048

Please sign in to comment.