From 299b18c704f3505c139d329336dd86785d4e0b5f Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Mon, 13 Mar 2023 14:43:10 +0100 Subject: [PATCH 1/4] Ref #565: export contacts database as CSV --- ctms/bin/acoustic.py | 83 ++++++++++++++++++++++++++++++++++++++++++++ ctms/crud.py | 12 +++++++ 2 files changed, 95 insertions(+) diff --git a/ctms/bin/acoustic.py b/ctms/bin/acoustic.py index a3bd3b47..ac5b2aa1 100755 --- a/ctms/bin/acoustic.py +++ b/ctms/bin/acoustic.py @@ -1,12 +1,16 @@ #!/usr/bin/env python3 """Schedule contacts to be synced to Acoustic.""" +import csv import os import sys from typing import Optional, TextIO +from uuid import UUID import click +import sqlalchemy from ctms import config +from ctms.acoustic_service import CTMSToAcousticService from ctms.crud import ( bulk_schedule_acoustic_records, create_acoustic_field, @@ -15,6 +19,7 @@ delete_acoustic_newsletters_mapping, get_all_acoustic_fields, get_all_acoustic_newsletters_mapping, + get_all_contacts_from_ids, get_contacts_from_newsletter, get_contacts_from_waitlist, reset_retry_acoustic_records, @@ -22,6 +27,7 @@ from ctms.database import SessionLocal from ctms.exception_capture import init_sentry from ctms.log import configure_logging +from ctms.schemas.contact import ContactSchema, get_stripe_products def confirm(msg): @@ -198,5 +204,82 @@ def do_resync( return os.EX_OK +@cli.command(help="Dump the contacts database in the same format as Acoustic") +@click.option( + "-q", + "--query", + help="Query to select contacts to be dumped", + default="SELECT email_id FROM emails;", +) +@click.option("-o", "--output", type=click.File("w")) +@click.pass_context +def dump( + ctx, + query: str, + output: TextIO, +): + """CTMS command to dump the contacts database.""" + if output is None: + output = sys.stdout + + with SessionLocal() as dbsession: + result = dbsession.execute(sqlalchemy.text(query)) + email_ids = [row[0] for row in result.all()] + + if not email_ids: + print("No contact found for query.") + sys.exit(os.EX_UNAVAILABLE) + + first = email_ids[0] + if not isinstance(first, UUID): + print(f"Query should return UUID, found: {first}") + sys.exit(os.EX_USAGE) + + answer = input(f"Dump CSV for {len(email_ids)} contacts [y/N]? ") + if not answer or answer.lower() != "y": + sys.exit(os.EX_OK) + + contacts = get_all_contacts_from_ids(dbsession, email_ids=email_ids) + return do_dump(dbsession, contacts, output) + + +def do_dump(dbsession, contacts, output: TextIO): + service = CTMSToAcousticService( + acoustic_client=None, + acoustic_main_table_id=-1, + acoustic_newsletter_table_id=-1, + acoustic_product_table_id=-1, + ) + main_fields = { + f.field for f in get_all_acoustic_fields(dbsession, tablename="main") + } + newsletters_mapping = { + m.source: m.destination for m in get_all_acoustic_newsletters_mapping(dbsession) + } + + fieldnames = None + writer = None + for email in contacts: + contact_mapping = { + "amo": email.amo, + "email": email, + "fxa": email.fxa, + "mofo": email.mofo, + "newsletters": email.newsletters, + "products": get_stripe_products(email), + "waitlists": email.waitlists, + } + contact = ContactSchema.parse_obj(contact_mapping) + main_table_row, _, _ = service.convert_ctms_to_acoustic( + contact, main_fields, newsletters_mapping + ) + # Write header on the first iteration. + if fieldnames is None: + fieldnames = sorted(main_table_row.keys()) + writer = csv.DictWriter(output, fieldnames=fieldnames) + writer.writeheader() + writer.writerow(main_table_row) + + if __name__ == "__main__": sys.exit(cli(obj={})) # pylint: disable=no-value-for-parameter diff --git a/ctms/crud.py b/ctms/crud.py index 7b70b6e7..43731ad4 100644 --- a/ctms/crud.py +++ b/ctms/crud.py @@ -105,6 +105,18 @@ def _contact_base_query(db): ) +def get_all_contacts(db): + """Fetch all contacts.""" + bulk_contacts = _contact_base_query(db) + return bulk_contacts.order_by(asc(Email.email_id)).all() + + +def get_all_contacts_from_ids(db, email_ids): + """Fetch all contacts that have the specified IDs.""" + bulk_contacts = _contact_base_query(db) + return bulk_contacts.filter(Email.email_id.in_(email_ids)).all() + + def get_bulk_query(start_time, end_time, after_email_uuid, mofo_relevant): filters = [ Email.update_timestamp >= start_time, From 6f15eac8c8372d32334addd5f026d084b5d390ec Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Thu, 15 Jun 2023 13:08:29 +0200 Subject: [PATCH 2/4] Update ctms/bin/acoustic.py Co-authored-by: grahamalama --- ctms/bin/acoustic.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/ctms/bin/acoustic.py b/ctms/bin/acoustic.py index ac5b2aa1..e280bb22 100755 --- a/ctms/bin/acoustic.py +++ b/ctms/bin/acoustic.py @@ -260,16 +260,7 @@ def do_dump(dbsession, contacts, output: TextIO): fieldnames = None writer = None for email in contacts: - contact_mapping = { - "amo": email.amo, - "email": email, - "fxa": email.fxa, - "mofo": email.mofo, - "newsletters": email.newsletters, - "products": get_stripe_products(email), - "waitlists": email.waitlists, - } - contact = ContactSchema.parse_obj(contact_mapping) + contact = ContactSchema.from_email(email) main_table_row, _, _ = service.convert_ctms_to_acoustic( contact, main_fields, newsletters_mapping ) From 71a29af523eccec6fed652bdf843aa31eea0b04d Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Thu, 15 Jun 2023 15:12:07 +0200 Subject: [PATCH 3/4] @grahamalama review --- ctms/bin/acoustic.py | 2 +- ctms/crud.py | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/ctms/bin/acoustic.py b/ctms/bin/acoustic.py index e280bb22..fcc69241 100755 --- a/ctms/bin/acoustic.py +++ b/ctms/bin/acoustic.py @@ -211,7 +211,7 @@ def do_resync( help="Query to select contacts to be dumped", default="SELECT email_id FROM emails;", ) -@click.option("-o", "--output", type=click.File("w")) +@click.option("-o", "--output", default="-", type=click.File("w")) @click.pass_context def dump( ctx, diff --git a/ctms/crud.py b/ctms/crud.py index 43731ad4..f865087e 100644 --- a/ctms/crud.py +++ b/ctms/crud.py @@ -105,12 +105,6 @@ def _contact_base_query(db): ) -def get_all_contacts(db): - """Fetch all contacts.""" - bulk_contacts = _contact_base_query(db) - return bulk_contacts.order_by(asc(Email.email_id)).all() - - def get_all_contacts_from_ids(db, email_ids): """Fetch all contacts that have the specified IDs.""" bulk_contacts = _contact_base_query(db) From a8efb52d32f287e4f4caf7cd12881d39e9cf870a Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Thu, 15 Jun 2023 16:13:04 +0200 Subject: [PATCH 4/4] Fix unused import --- ctms/bin/acoustic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ctms/bin/acoustic.py b/ctms/bin/acoustic.py index fcc69241..58ab4f9b 100755 --- a/ctms/bin/acoustic.py +++ b/ctms/bin/acoustic.py @@ -27,7 +27,7 @@ from ctms.database import SessionLocal from ctms.exception_capture import init_sentry from ctms.log import configure_logging -from ctms.schemas.contact import ContactSchema, get_stripe_products +from ctms.schemas.contact import ContactSchema def confirm(msg):