diff --git a/ctms/bin/acoustic.py b/ctms/bin/acoustic.py index 3b852a98..a8e4a111 100755 --- a/ctms/bin/acoustic.py +++ b/ctms/bin/acoustic.py @@ -11,6 +11,7 @@ bulk_schedule_acoustic_records, get_contacts_from_newsletter, get_contacts_from_waitlist, + reset_retry_acoustic_records, ) from ctms.database import SessionLocal from ctms.exception_capture import init_sentry @@ -41,6 +42,13 @@ def cli(ctx): default=False, help="Automatic yes to prompts.", ) +@click.option( + "--reset-retries", + is_flag=True, + show_default=True, + default=False, + help="Reset retry count of failing contacts", +) @click.option("--email-file", type=click.File("r")) @click.option("--newsletter") @click.option("--waitlist") @@ -48,18 +56,30 @@ def cli(ctx): def resync( ctx, yes: bool, + reset_retries: bool, email_file: TextIO, newsletter: Optional[str] = None, waitlist: Optional[str] = None, ): """CTMS command to sync contacts with Acoustic.""" with SessionLocal() as dbsession: - return do_resync(dbsession, yes, email_file, newsletter, waitlist) + return do_resync( + dbsession, yes, reset_retries, email_file, newsletter, waitlist + ) def do_resync( - dbsession, assume_yes=False, emails_file=None, newsletter=None, waitlist=None + dbsession, + assume_yes=False, + reset_retries=False, + emails_file=None, + newsletter=None, + waitlist=None, ): + resetted = 0 + if reset_retries: + resetted = reset_retry_acoustic_records(dbsession) + to_resync = [] if emails_file: for line in emails_file.readlines(): @@ -77,7 +97,7 @@ def do_resync( raise ValueError(f"Unknown waitlist {waitlist!r}") to_resync.extend(c.email.primary_email for c in contacts) - print(f"Force resync of {len(to_resync)} contacts") + print(f"Force resync of {resetted + len(to_resync)} contacts") if to_resync and (assume_yes or confirm("Continue?")): bulk_schedule_acoustic_records(dbsession, to_resync) dbsession.commit() diff --git a/ctms/crud.py b/ctms/crud.py index 752c1336..1e67af15 100644 --- a/ctms/crud.py +++ b/ctms/crud.py @@ -7,7 +7,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, cast from pydantic import UUID4 -from sqlalchemy import asc, or_ +from sqlalchemy import asc, or_, update from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session, joinedload, load_only, selectinload @@ -376,6 +376,18 @@ def bulk_schedule_acoustic_records(db: Session, primary_emails: list[str]): ) +def reset_retry_acoustic_records(db: Session): + pending_records = ( + db.query(PendingAcousticRecord).filter(PendingAcousticRecord.retry > 0).all() + ) + count = len(pending_records) + db.execute( + update(PendingAcousticRecord), + [{"id": record.id, "retry": 0} for record in (pending_records)], + ) + return count + + def schedule_acoustic_record( db: Session, email_id: UUID4, diff --git a/tests/unit/test_bin_acoustic_resync.py b/tests/unit/test_bin_acoustic_resync.py index 803f8f51..a9520500 100644 --- a/tests/unit/test_bin_acoustic_resync.py +++ b/tests/unit/test_bin_acoustic_resync.py @@ -32,3 +32,21 @@ def test_main_force_resync_by_email_list(dbsession, sample_contacts, tmpdir): do_resync(dbsession, assume_yes=True, emails_file=f) assert len(dbsession.query(PendingAcousticRecord).all()) > 0 + + +def test_main_force_resync_by_reset_retry(dbsession, sample_contacts, tmpdir): + _, some_contact = sample_contacts["maximal"] + record = PendingAcousticRecord(email_id=some_contact.email.email_id, retry=99) + dbsession.add(record) + dbsession.flush() + + do_resync(dbsession, reset_retries=True) + + assert ( + len( + dbsession.query(PendingAcousticRecord) + .filter(PendingAcousticRecord.retry > 0) + .all() + ) + == 0 + )