Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ref #565: export contacts database as CSV #686

Merged
merged 5 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions ctms/bin/acoustic.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#!/usr/bin/env python3
"""Schedule contacts to be synced to Acoustic."""
import csv
import os
import sys
from typing import Optional, TextIO
from uuid import UUID

import click
import sqlalchemy

from ctms import config
from ctms.acoustic_service import CTMSToAcousticService
from ctms.crud import (
bulk_schedule_acoustic_records,
create_acoustic_field,
Expand All @@ -15,13 +19,15 @@
delete_acoustic_newsletters_mapping,
get_all_acoustic_fields,
get_all_acoustic_newsletters_mapping,
get_all_contacts_from_ids,
get_contacts_from_newsletter,
get_contacts_from_waitlist,
reset_retry_acoustic_records,
)
from ctms.database import SessionLocal
from ctms.exception_capture import init_sentry
from ctms.log import configure_logging
from ctms.schemas.contact import ContactSchema


def confirm(msg):
Expand Down Expand Up @@ -198,5 +204,73 @@ def do_resync(
return os.EX_OK


@cli.command(help="Dump the contacts database in the same format as Acoustic")
@click.option(
"-q",
"--query",
help="Query to select contacts to be dumped",
default="SELECT email_id FROM emails;",
grahamalama marked this conversation as resolved.
Show resolved Hide resolved
)
@click.option("-o", "--output", default="-", type=click.File("w"))
@click.pass_context
def dump(
ctx,
query: str,
output: TextIO,
):
"""CTMS command to dump the contacts database."""
if output is None:
output = sys.stdout
leplatrem marked this conversation as resolved.
Show resolved Hide resolved

with SessionLocal() as dbsession:
result = dbsession.execute(sqlalchemy.text(query))
email_ids = [row[0] for row in result.all()]

if not email_ids:
print("No contact found for query.")
sys.exit(os.EX_UNAVAILABLE)

first = email_ids[0]
if not isinstance(first, UUID):
print(f"Query should return UUID, found: {first}")
sys.exit(os.EX_USAGE)

answer = input(f"Dump CSV for {len(email_ids)} contacts [y/N]? ")
if not answer or answer.lower() != "y":
sys.exit(os.EX_OK)

contacts = get_all_contacts_from_ids(dbsession, email_ids=email_ids)
return do_dump(dbsession, contacts, output)


def do_dump(dbsession, contacts, output: TextIO):
service = CTMSToAcousticService(
acoustic_client=None,
acoustic_main_table_id=-1,
acoustic_newsletter_table_id=-1,
acoustic_product_table_id=-1,
)
main_fields = {
f.field for f in get_all_acoustic_fields(dbsession, tablename="main")
}
newsletters_mapping = {
m.source: m.destination for m in get_all_acoustic_newsletters_mapping(dbsession)
}

fieldnames = None
writer = None
for email in contacts:
contact = ContactSchema.from_email(email)
main_table_row, _, _ = service.convert_ctms_to_acoustic(
contact, main_fields, newsletters_mapping
)
# Write header on the first iteration.
if fieldnames is None:
fieldnames = sorted(main_table_row.keys())
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerow(main_table_row)


if __name__ == "__main__":
sys.exit(cli(obj={})) # pylint: disable=no-value-for-parameter
6 changes: 6 additions & 0 deletions ctms/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ def _contact_base_query(db):
)


def get_all_contacts_from_ids(db, email_ids):
leplatrem marked this conversation as resolved.
Show resolved Hide resolved
"""Fetch all contacts that have the specified IDs."""
bulk_contacts = _contact_base_query(db)
return bulk_contacts.filter(Email.email_id.in_(email_ids)).all()


def get_bulk_query(start_time, end_time, after_email_uuid, mofo_relevant):
filters = [
Email.update_timestamp >= start_time,
Expand Down