Skip to content

Commit

Permalink
Read Acoustic Newsletters mapping from database (ref mozilla-it#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
leplatrem committed Nov 15, 2022
1 parent 1f53cac commit 7463ef9
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 73 deletions.
62 changes: 22 additions & 40 deletions ctms/acoustic_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,31 +58,6 @@ def force_bytes(s, encoding="utf-8", strings_only=False, errors="strict"):


class AcousticResources:
MAIN_TABLE_SUBSCR_FLAGS = {
# maps the Basket/CTMS newsletter name to the Acoustic name
"about-mozilla": "sub_about_mozilla",
"app-dev": "sub_apps_and_hacks",
"common-voice": "sub_common_voice",
"firefox-accounts-journey": "sub_firefox_accounts_journey",
"firefox-news": "sub_firefox_news",
"firefox-sweepstakes": "sub_firefox_sweepstakes",
"hubs": "sub_hubs",
"internet-health-report": "sub_internet_health_report",
"knowledge-is-power": "sub_knowledge_is_power",
"miti": "sub_miti",
"mixed-reality": "sub_mixed_reality",
"mozilla-and-you": "sub_firefox_news",
"mozilla-fellowship-awardee-alumni": "sub_mozilla_fellowship_awardee_alumni",
"mozilla-festival": "sub_mozilla_festival",
"mozilla-foundation": "sub_mozilla_foundation",
"mozilla-rally": "sub_rally",
"mozilla-technology": "sub_mozilla_technology",
"mozillians-nda": "sub_mozillians_nda",
"security-privacy-news": "sub_security_privacy_news",
"take-action-for-the-internet": "sub_take_action_for_the_internet",
"test-pilot": "sub_test_pilot",
}

SKIP_FIELDS = set(
(
# Known skipped fields from CTMS
Expand Down Expand Up @@ -159,21 +134,21 @@ def __init__(
self.context: Dict[str, Union[str, int, List[str]]] = {}
self.metric_service = metric_service

def convert_ctms_to_acoustic(self, contact: ContactSchema, main_fields: set[str]):
def convert_ctms_to_acoustic(
self,
contact: ContactSchema,
main_fields: set[str],
newsletters_mapping: dict[str, str],
):
acoustic_main_table = self._main_table_converter(contact, main_fields)
newsletter_rows, acoustic_main_table = self._newsletter_converter(
acoustic_main_table, contact
acoustic_main_table, contact, newsletters_mapping
)
product_rows = self._product_converter(contact)
return acoustic_main_table, newsletter_rows, product_rows

def _main_table_converter(self, contact, main_fields):
acoustic_main_table = {
# populate with all the sub_flags set to false
# they'll get set to true below, as-needed
v: "0"
for v in AcousticResources.MAIN_TABLE_SUBSCR_FLAGS.values()
}
acoustic_main_table = {}
acceptable_subdicts = ["email", "amo", "fxa", "vpn_waitlist", "relay_waitlist"]
special_cases = {
("fxa", "fxa_id"): "fxa_id",
Expand Down Expand Up @@ -234,22 +209,28 @@ def fxa_created_date_string_to_datetime(self, inner_value):
self.context["fxa_created_date_converted"] = "skipped"
return inner_value

def _newsletter_converter(self, acoustic_main_table, contact):
def _newsletter_converter(self, acoustic_main_table, contact, newsletters_mapping):
# create the RT rows for the newsletter table in acoustic
newsletter_rows = []
contact_newsletters: List[NewsletterSchema] = contact.newsletters
contact_email_id = str(contact.email.email_id)
contact_email_format = contact.email.email_format
contact_email_lang = contact.email.email_lang
skipped = []

# populate with all the sub_flags set to false
# they'll get set to true below, as-needed
for sub_flag in newsletters_mapping.values():
acoustic_main_table[sub_flag] = "0"

for newsletter in contact_newsletters:
newsletter_template = {
"email_id": contact_email_id,
"newsletter_format": contact_email_format,
"newsletter_lang": contact_email_lang,
}

if newsletter.name in AcousticResources.MAIN_TABLE_SUBSCR_FLAGS:
if newsletter.name in newsletters_mapping:
newsletter_dict = newsletter.dict()
_today = datetime.date.today().isoformat()
newsletter_template["create_timestamp"] = newsletter_dict.get(
Expand All @@ -268,9 +249,7 @@ def _newsletter_converter(self, acoustic_main_table, contact):
newsletter_rows.append(newsletter_template)
# and finally flip the main table's sub_<newsletter> flag to true for each subscription
if newsletter.subscribed:
acoustic_main_table[
AcousticResources.MAIN_TABLE_SUBSCR_FLAGS[newsletter.name]
] = "1"
acoustic_main_table[newsletters_mapping[newsletter.name]] = "1"
else:
skipped.append(newsletter.name)
if skipped:
Expand Down Expand Up @@ -418,7 +397,10 @@ def _insert_update_relational_table(self, table_name, rows):
self.context[f"{table_name}_duration_s"] = duration_s

def attempt_to_upload_ctms_contact(
self, contact: ContactSchema, main_fields: set[str]
self,
contact: ContactSchema,
main_fields: set[str],
newsletters_mapping: dict[str, str],
) -> bool:
"""
Expand All @@ -428,7 +410,7 @@ def attempt_to_upload_ctms_contact(
self.context = {}
try:
main_table_data, nl_data, prod_data = self.convert_ctms_to_acoustic(
contact, main_fields
contact, main_fields, newsletters_mapping
)
main_table_id = str(self.acoustic_main_table_id)
email_id = main_table_data["email_id"]
Expand Down
61 changes: 61 additions & 0 deletions ctms/bin/acoustic_newsletters_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python3
"""Manage Acoustic newsletters mapping: add and remove from the db."""

import argparse
import sys

from ctms import config
from ctms.database import get_db_engine
from ctms.models import AcousticNewsletterMapping


def main(dbsession, test_args=None) -> int:
parser = argparse.ArgumentParser(
description="""
Manage Acoustic Newsletter mapping
"""
)
subparsers = parser.add_subparsers(dest="action")
parser_add = subparsers.add_parser("add")
parser_add.add_argument("mapping")

parser_remove = subparsers.add_parser("remove")
parser_remove.add_argument("mapping")

parser_list = subparsers.add_parser("list")

args = parser.parse_args(args=test_args)
if args.action == "add":
source, destination = args.mapping.split(":")
# This will fail if mapping already exists.
dbsession.add(AcousticNewsletterMapping(source=source, destination=destination))
dbsession.commit()
print("Added.")
elif args.action == "remove":
row = (
dbsession.query(AcousticNewsletterMapping)
.filter(AcousticNewsletterMapping.source == args.mapping)
.one_or_none()
)
if not row:
print(f"Unknown mapping '{args.mapping}'. Give up.")
return 2
dbsession.delete(row)
dbsession.commit()
print("Removed.")
else:
entries = dbsession.query(AcousticNewsletterMapping).all()
print("\n".join(sorted(f"- {e.source!r}{e.destination!r}" for e in entries)))

return 0


if __name__ == "__main__":
# Get the database
config_settings = config.Settings()
engine, session_factory = get_db_engine(config_settings)
session = session_factory()
with engine.connect() as connection:
ret = main(session) # pylint:disable = invalid-name

sys.exit(ret)
7 changes: 7 additions & 0 deletions ctms/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ class AcousticField(Base):
)


class AcousticNewsletterMapping(Base):
__tablename__ = "acoustic_newsletter_mapping"

source = Column(String, primary_key=True)
destination = Column(String)


class VpnWaitlist(Base):
__tablename__ = "vpn_waitlist"

Expand Down
32 changes: 26 additions & 6 deletions ctms/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
get_all_acoustic_retries_count,
retry_acoustic_record,
)
from ctms.models import AcousticField, PendingAcousticRecord
from ctms.models import AcousticField, AcousticNewsletterMapping, PendingAcousticRecord
from ctms.schemas import ContactSchema


Expand Down Expand Up @@ -53,7 +53,12 @@ def __init__(
self.is_acoustic_enabled = is_acoustic_enabled
self.metric_service = metric_service

def sync_contact_with_acoustic(self, contact: ContactSchema, main_fields: set[str]):
def sync_contact_with_acoustic(
self,
contact: ContactSchema,
main_fields: set[str],
newsletters_mapping: dict[str, str],
):
"""
:param contact:
Expand All @@ -62,22 +67,28 @@ def sync_contact_with_acoustic(self, contact: ContactSchema, main_fields: set[st
try:
# Convert ContactSchema to Acoustic Readable, attempt API call
return self.ctms_to_acoustic.attempt_to_upload_ctms_contact(
contact, main_fields
contact, main_fields, newsletters_mapping
)
except Exception: # pylint: disable=W0703
self.logger.exception("Error executing sync.sync_contact_with_acoustic")
return False

def _sync_pending_record(
self, db, pending_record: PendingAcousticRecord, main_fields: set[str]
self,
db,
pending_record: PendingAcousticRecord,
main_fields: set[str],
newsletters_mapping: dict[str, str],
) -> str:
state = "unknown"
try:
if self.is_acoustic_enabled:
contact: ContactSchema = get_acoustic_record_as_contact(
db, pending_record
)
is_success = self.sync_contact_with_acoustic(contact, main_fields)
is_success = self.sync_contact_with_acoustic(
contact, main_fields, newsletters_mapping
)
else:
self.logger.debug(
"Acoustic is not currently enabled. Records will be classified as successful and "
Expand Down Expand Up @@ -129,6 +140,13 @@ def sync_records(self, db, end_time=None) -> Dict[str, Union[int, str]]:
)
main_fields = {entry.field for entry in main_acoustic_fields}

newsletters_mapping_entries: List[AcousticNewsletterMapping] = db.query(
AcousticNewsletterMapping
).all()
newsletters_mapping = {
entry.source: entry.destination for entry in newsletters_mapping_entries
}

# Get all Records before current time
all_acoustic_records_before_now: List[
PendingAcousticRecord
Expand All @@ -144,7 +162,9 @@ def sync_records(self, db, end_time=None) -> Dict[str, Union[int, str]]:
states: Dict[str, int] = defaultdict(int)
record_created = None
for acoustic_record in all_acoustic_records_before_now:
state = self._sync_pending_record(db, acoustic_record, main_fields)
state = self._sync_pending_record(
db, acoustic_record, main_fields, newsletters_mapping
)
total += 1

states[state] += 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Add Acoustic Newsletter Mapping
Revision ID: f8b8f6aa0e96
Revises: 06f24cc01c09
Create Date: 2022-11-15 12:20:10.096170
"""
# pylint: disable=no-member invalid-name
# no-member is triggered by alembic.op, which has dynamically added functions
# invalid-name is triggered by migration file names with a date prefix
# invalid-name is triggered by top-level alembic constants like revision instead of REVISION

import sqlalchemy as sa
from alembic import op


# revision identifiers, used by Alembic.
revision = "f8b8f6aa0e96" # pragma: allowlist secret
down_revision = "06f24cc01c09" # pragma: allowlist secret
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
acoustic_mapping_table = op.create_table(
"acoustic_newsletter_mapping",
sa.Column("source", sa.String(), nullable=False),
sa.Column("destination", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("source"),
)
# ### end Alembic commands ###

# This mapping used to be hard-coded. We now initialize the freshly created
# table with it, but the records of this table will likely be modified
# eventually.
MAIN_TABLE_SUBSCR_FLAGS = {
# maps the Basket/CTMS newsletter name to the Acoustic name
"about-mozilla": "sub_about_mozilla",
"app-dev": "sub_apps_and_hacks",
"common-voice": "sub_common_voice",
"firefox-accounts-journey": "sub_firefox_accounts_journey",
"firefox-news": "sub_firefox_news",
"firefox-sweepstakes": "sub_firefox_sweepstakes",
"hubs": "sub_hubs",
"internet-health-report": "sub_internet_health_report",
"knowledge-is-power": "sub_knowledge_is_power",
"miti": "sub_miti",
"mixed-reality": "sub_mixed_reality",
"mozilla-and-you": "sub_firefox_news",
"mozilla-fellowship-awardee-alumni": "sub_mozilla_fellowship_awardee_alumni",
"mozilla-festival": "sub_mozilla_festival",
"mozilla-foundation": "sub_mozilla_foundation",
"mozilla-rally": "sub_rally",
"mozilla-technology": "sub_mozilla_technology",
"mozillians-nda": "sub_mozillians_nda",
"security-privacy-news": "sub_security_privacy_news",
"take-action-for-the-internet": "sub_take_action_for_the_internet",
"test-pilot": "sub_test_pilot",
}
op.bulk_insert(
acoustic_mapping_table,
[{"source": k, "destination": v} for k, v in MAIN_TABLE_SUBSCR_FLAGS.items()],
)


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("acoustic_newsletter_mapping")
# ### end Alembic commands ###
8 changes: 7 additions & 1 deletion tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
get_stripe_products,
get_vpn_by_email_id,
)
from ctms.models import AcousticField
from ctms.models import AcousticField, AcousticNewsletterMapping
from ctms.schemas import (
ApiClientSchema,
ContactSchema,
Expand Down Expand Up @@ -170,6 +170,12 @@ def main_acoustic_fields(dbsession):
return {r.field for r in records}


@pytest.fixture
def acoustic_newsletters_mapping(dbsession):
records = dbsession.query(AcousticNewsletterMapping).all()
return {r.source: r.destination for r in records}


@pytest.fixture
def anon_client():
"""A test client with no authorization."""
Expand Down
Loading

0 comments on commit 7463ef9

Please sign in to comment.