From f938d94c0332948114edb1fdeb1dc8ad18997802 Mon Sep 17 00:00:00 2001 From: Graham Beckley Date: Thu, 4 May 2023 17:21:02 -0400 Subject: [PATCH 1/3] Add method for generating a contact from an email --- ctms/app.py | 9 +-- ctms/crud.py | 129 +--------------------------------------- ctms/schemas/contact.py | 103 +++++++++++++++++++++++++++++++- 3 files changed, 106 insertions(+), 135 deletions(-) diff --git a/ctms/app.py b/ctms/app.py index 6327e645..7a5e06f5 100644 --- a/ctms/app.py +++ b/ctms/app.py @@ -155,14 +155,7 @@ def get_email_or_404(db: Session, email_id) -> Email: def get_contact_or_404(db: Session, email_id) -> ContactSchema: """Get a contact by email_ID, or raise a 404 exception.""" email = get_email_or_404(db, email_id) - return ContactSchema( - amo=email.amo, - email=email, - fxa=email.fxa, - mofo=email.mofo, - newsletters=email.newsletters, - waitlists=email.waitlists, - ) + return ContactSchema.from_email(email) def all_ids( diff --git a/ctms/crud.py b/ctms/crud.py index dbc84d21..7b70b6e7 100644 --- a/ctms/crud.py +++ b/ctms/crud.py @@ -1,7 +1,6 @@ from __future__ import annotations import uuid -from collections import defaultdict from datetime import datetime, timezone from functools import partial from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, cast @@ -44,7 +43,6 @@ FirefoxAccountsInSchema, MozillaFoundationInSchema, NewsletterInSchema, - ProductBaseSchema, StripeCustomerCreateSchema, StripeInvoiceCreateSchema, StripeInvoiceLineItemCreateSchema, @@ -153,19 +151,7 @@ def get_bulk_contacts( .all() ) - return [ - ContactSchema.parse_obj( - { - "amo": email.amo, - "email": email, - "fxa": email.fxa, - "mofo": email.mofo, - "newsletters": email.newsletters, - "waitlists": email.waitlists, - } - ) - for email in bulk_contacts - ] + return [ContactSchema.from_email(email) for email in bulk_contacts] def get_email(db: Session, email_id: UUID4) -> Optional[Email]: @@ -181,16 +167,7 @@ def get_contact_by_email_id(db: Session, email_id: UUID4) -> Optional[ContactSch email = get_email(db, email_id) if email is None: return None - products = get_stripe_products(email) - return ContactSchema( - amo=email.amo, - email=email, - fxa=email.fxa, - mofo=email.mofo, - newsletters=email.newsletters, - products=products, - waitlists=email.waitlists, - ) + return ContactSchema.from_email(email) def get_contacts_by_any_id( @@ -252,17 +229,7 @@ def get_contacts_by_any_id( fxa_primary_email_insensitive_comparator=fxa_primary_email ) emails = cast(List[Email], statement.all()) - return [ - ContactSchema( - amo=email.amo, - email=email, - fxa=email.fxa, - mofo=email.mofo, - newsletters=email.newsletters, - waitlists=email.waitlists, - ) - for email in emails - ] + return [ContactSchema.from_email(email) for email in emails] def _acoustic_sync_retry_query(db: Session): @@ -784,96 +751,6 @@ def get_stripe_customer_by_fxa_id( return cast(Optional[StripeCustomer], obj) -def get_stripe_products(email: Email) -> List[ProductBaseSchema]: - """Return a list of Stripe products for the contact, if any.""" - if not email.stripe_customer: - return [] - - base_data: Dict[str, Any] = { - "payment_service": "stripe", - # These come from the Payment Method, not imported from Stripe. - "payment_type": None, - "card_brand": None, - "card_last4": None, - "billing_country": None, - } - by_product = defaultdict(list) - - for subscription in email.stripe_customer.subscriptions: - subscription_data = base_data.copy() - subscription_data.update( - { - "status": subscription.status, - "created": subscription.stripe_created, - "start": subscription.start_date, - "current_period_start": subscription.current_period_start, - "current_period_end": subscription.current_period_end, - "canceled_at": subscription.canceled_at, - "cancel_at_period_end": subscription.cancel_at_period_end, - "ended_at": subscription.ended_at, - } - ) - for item in subscription.subscription_items: - product_data = subscription_data.copy() - price = item.price - product_data.update( - { - "product_id": price.stripe_product_id, - "product_name": None, # Products are not imported - "price_id": price.stripe_id, - "currency": price.currency, - "amount": price.unit_amount, - "interval_count": price.recurring_interval_count, - "interval": price.recurring_interval, - } - ) - by_product[price.stripe_product_id].append(product_data) - - products = [] - for subscriptions in by_product.values(): - # Sort to find the latest subscription - def get_current_period(sub: Dict) -> datetime: - return cast(datetime, sub["current_period_end"]) - - subscriptions.sort(key=get_current_period, reverse=True) - latest = subscriptions[0] - data = latest.copy() - if len(subscriptions) == 1: - segment_prefix = "" - else: - segment_prefix = "re-" - if latest["status"] == "active": - if latest["canceled_at"]: - segment = "cancelling" - changed = latest["canceled_at"] - else: - segment = "active" - changed = latest["start"] - elif latest["status"] == "canceled": - segment = "canceled" - changed = latest["ended_at"] - else: - segment_prefix = "" - segment = "other" - changed = latest["created"] - - assert changed - data.update( - { - "sub_count": len(subscriptions), - "segment": f"{segment_prefix}{segment}", - "changed": changed, - } - ) - products.append(ProductBaseSchema(**data)) - - def get_product_id(prod: ProductBaseSchema) -> str: - return prod.product_id or "" - - products.sort(key=get_product_id) - return products - - def get_all_acoustic_fields(dbsession: Session, tablename: Optional[str] = None): query = dbsession.query(AcousticField).order_by( asc(AcousticField.tablename), asc(AcousticField.field) diff --git a/ctms/schemas/contact.py b/ctms/schemas/contact.py index 09b54782..263f283f 100644 --- a/ctms/schemas/contact.py +++ b/ctms/schemas/contact.py @@ -1,5 +1,6 @@ +from collections import defaultdict from datetime import datetime -from typing import List, Literal, Optional, Set, Union +from typing import TYPE_CHECKING, List, Literal, Optional, Set, Union, cast from uuid import UUID from pydantic import AnyUrl, BaseModel, Field, root_validator, validator @@ -27,6 +28,94 @@ validate_waitlist_newsletters, ) +if TYPE_CHECKING: + from models import Email + + +def get_stripe_products(email: "Email") -> List[ProductBaseSchema]: + """Return a list of Stripe products for the contact, if any.""" + if not email.stripe_customer: + return [] + + base_data: dict[str, Any] = { + "payment_service": "stripe", + # These come from the Payment Method, not imported from Stripe. + "payment_type": None, + "card_brand": None, + "card_last4": None, + "billing_country": None, + } + by_product = defaultdict(list) + + for subscription in email.stripe_customer.subscriptions: + subscription_data = base_data.copy() + subscription_data.update( + { + "status": subscription.status, + "created": subscription.stripe_created, + "start": subscription.start_date, + "current_period_start": subscription.current_period_start, + "current_period_end": subscription.current_period_end, + "canceled_at": subscription.canceled_at, + "cancel_at_period_end": subscription.cancel_at_period_end, + "ended_at": subscription.ended_at, + } + ) + for item in subscription.subscription_items: + product_data = subscription_data.copy() + price = item.price + product_data.update( + { + "product_id": price.stripe_product_id, + "product_name": None, # Products are not imported + "price_id": price.stripe_id, + "currency": price.currency, + "amount": price.unit_amount, + "interval_count": price.recurring_interval_count, + "interval": price.recurring_interval, + } + ) + by_product[price.stripe_product_id].append(product_data) + + products = [] + for subscriptions in by_product.values(): + subscriptions.sort( + key=lambda sub: cast(datetime, sub["current_period_end"]), reverse=True + ) + latest = subscriptions[0] + data = latest.copy() + if len(subscriptions) == 1: + segment_prefix = "" + else: + segment_prefix = "re-" + if latest["status"] == "active": + if latest["canceled_at"]: + segment = "cancelling" + changed = latest["canceled_at"] + else: + segment = "active" + changed = latest["start"] + elif latest["status"] == "canceled": + segment = "canceled" + changed = latest["ended_at"] + else: + segment_prefix = "" + segment = "other" + changed = latest["created"] + + assert changed + data.update( + { + "sub_count": len(subscriptions), + "segment": f"{segment_prefix}{segment}", + "changed": changed, + } + ) + products.append(ProductBaseSchema(**data)) + + products.sort(key=lambda prod: prod.product_id or "") + return products + class ContactSchema(ComparableBase): """A complete contact.""" @@ -39,6 +128,18 @@ class ContactSchema(ComparableBase): waitlists: List[WaitlistSchema] = [] products: List[ProductBaseSchema] = [] + @classmethod + def from_email(cls, email: "Email") -> "ContactSchema": + return cls( + amo=email.amo, + email=email, + fxa=email.fxa, + mofo=email.mofo, + newsletters=email.newsletters, + waitlists=email.waitlists, + products=get_stripe_products(email), + ) + class Config: fields = { "newsletters": { From 1e783210aca60082634fb2d5b416da312e1c5359 Mon Sep 17 00:00:00 2001 From: Graham Beckley Date: Fri, 5 May 2023 11:16:38 -0400 Subject: [PATCH 2/3] Break up get_stripe_products --- ctms/schemas/contact.py | 132 ++++++++++++++++++++-------------------- 1 file changed, 66 insertions(+), 66 deletions(-) diff --git a/ctms/schemas/contact.py b/ctms/schemas/contact.py index 263f283f..a69104cd 100644 --- a/ctms/schemas/contact.py +++ b/ctms/schemas/contact.py @@ -32,25 +32,21 @@ from models import Email -def get_stripe_products(email: "Email") -> List[ProductBaseSchema]: - """Return a list of Stripe products for the contact, if any.""" - if not email.stripe_customer: - return [] - - base_data: dict[str, Any] = { - "payment_service": "stripe", - # These come from the Payment Method, not imported from Stripe. - "payment_type": None, - "card_brand": None, - "card_last4": None, - "billing_country": None, - } +def _subscriptions_by_product(subscriptions): by_product = defaultdict(list) - for subscription in email.stripe_customer.subscriptions: - subscription_data = base_data.copy() - subscription_data.update( - { + for subscription in subscriptions: + for item in subscription.subscription_items: + price = item.price + product_data = { + "payment_service": "stripe", + ### + # These come from the Payment Method, not imported from Stripe. + "payment_type": None, + "card_brand": None, + "card_last4": None, + "billing_country": None, + ### "status": subscription.status, "created": subscription.stripe_created, "start": subscription.start_date, @@ -59,60 +55,64 @@ def get_stripe_products(email: "Email") -> List[ProductBaseSchema]: "canceled_at": subscription.canceled_at, "cancel_at_period_end": subscription.cancel_at_period_end, "ended_at": subscription.ended_at, + "product_id": price.stripe_product_id, + "product_name": None, # Products are not imported + "price_id": price.stripe_id, + "currency": price.currency, + "amount": price.unit_amount, + "interval_count": price.recurring_interval_count, + "interval": price.recurring_interval, } - ) - for item in subscription.subscription_items: - product_data = subscription_data.copy() - price = item.price - product_data.update( - { - "product_id": price.stripe_product_id, - "product_name": None, # Products are not imported - "price_id": price.stripe_id, - "currency": price.currency, - "amount": price.unit_amount, - "interval_count": price.recurring_interval_count, - "interval": price.recurring_interval, - } - ) by_product[price.stripe_product_id].append(product_data) + return by_product - products = [] - for subscriptions in by_product.values(): - subscriptions.sort( - key=lambda sub: cast(datetime, sub["current_period_end"]), reverse=True - ) - latest = subscriptions[0] - data = latest.copy() - if len(subscriptions) == 1: - segment_prefix = "" - else: - segment_prefix = "re-" - if latest["status"] == "active": - if latest["canceled_at"]: - segment = "cancelling" - changed = latest["canceled_at"] - else: - segment = "active" - changed = latest["start"] - elif latest["status"] == "canceled": - segment = "canceled" - changed = latest["ended_at"] + +def _product_metadata(subscriptions_by_product): + latest = max( + subscriptions_by_product, + key=lambda sub: cast(datetime, sub["current_period_end"]), + ) + if len(subscriptions_by_product) == 1: + segment_prefix = "" + else: + segment_prefix = "re-" + if latest["status"] == "active": + if latest["canceled_at"]: + segment = "cancelling" + changed = latest["canceled_at"] else: - segment_prefix = "" - segment = "other" - changed = latest["created"] - - assert changed - data.update( - { - "sub_count": len(subscriptions), - "segment": f"{segment_prefix}{segment}", - "changed": changed, - } - ) - products.append(ProductBaseSchema(**data)) + segment = "active" + changed = latest["start"] + elif latest["status"] == "canceled": + segment = "canceled" + changed = latest["ended_at"] + else: + segment_prefix = "" + segment = "other" + changed = latest["created"] + + assert changed + latest.update( + { + "sub_count": len(subscriptions_by_product), + "segment": f"{segment_prefix}{segment}", + "changed": changed, + } + ) + return ProductBaseSchema(**latest) + +def get_stripe_products(email: "Email") -> List[ProductBaseSchema]: + """Return a list of Stripe products for the contact, if any.""" + if not email.stripe_customer: + return [] + subscription_metadata_by_product = _subscriptions_by_product( + email.stripe_customer.subscriptions + ) + products = [ + _product_metadata(subscriptions) + for subscriptions in subscription_metadata_by_product.values() + ] products.sort(key=lambda prod: prod.product_id or "") return products From 8dc49ff25a7cab22659bbda875fff29ee648936a Mon Sep 17 00:00:00 2001 From: Graham Beckley Date: Fri, 12 May 2023 14:04:48 -0400 Subject: [PATCH 3/3] Refactor the way we build products for a customer --- ctms/schemas/contact.py | 132 +++++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 56 deletions(-) diff --git a/ctms/schemas/contact.py b/ctms/schemas/contact.py index a69104cd..889309a6 100644 --- a/ctms/schemas/contact.py +++ b/ctms/schemas/contact.py @@ -17,7 +17,7 @@ from .fxa import FirefoxAccountsInSchema, FirefoxAccountsSchema from .mofo import MozillaFoundationInSchema, MozillaFoundationSchema from .newsletter import NewsletterInSchema, NewsletterSchema -from .product import ProductBaseSchema +from .product import ProductBaseSchema, ProductSegmentEnum from .waitlist import ( RelayWaitlistInSchema, RelayWaitlistSchema, @@ -29,89 +29,109 @@ ) if TYPE_CHECKING: - from models import Email + from models import Email, StripeSubscription, StripeSubscriptionItem -def _subscriptions_by_product(subscriptions): +def _subscription_items_by_product( + subscriptions: list["StripeSubscription"], +) -> dict[str, "StripeSubscriptionItem"]: + """Groups Stripe subscription items by the Stripe product ID they're associated with""" by_product = defaultdict(list) for subscription in subscriptions: for item in subscription.subscription_items: - price = item.price - product_data = { - "payment_service": "stripe", - ### - # These come from the Payment Method, not imported from Stripe. - "payment_type": None, - "card_brand": None, - "card_last4": None, - "billing_country": None, - ### - "status": subscription.status, - "created": subscription.stripe_created, - "start": subscription.start_date, - "current_period_start": subscription.current_period_start, - "current_period_end": subscription.current_period_end, - "canceled_at": subscription.canceled_at, - "cancel_at_period_end": subscription.cancel_at_period_end, - "ended_at": subscription.ended_at, - "product_id": price.stripe_product_id, - "product_name": None, # Products are not imported - "price_id": price.stripe_id, - "currency": price.currency, - "amount": price.unit_amount, - "interval_count": price.recurring_interval_count, - "interval": price.recurring_interval, - } - by_product[price.stripe_product_id].append(product_data) + by_product[item.price.stripe_product_id].append(item) return by_product -def _product_metadata(subscriptions_by_product): - latest = max( - subscriptions_by_product, - key=lambda sub: cast(datetime, sub["current_period_end"]), - ) - if len(subscriptions_by_product) == 1: - segment_prefix = "" - else: - segment_prefix = "re-" - if latest["status"] == "active": - if latest["canceled_at"]: +def _determine_segment( + latest: "StripeSubscription", num_subscriptions: int +) -> ProductSegmentEnum: + """Use product subscription data to determine the marketing segment for + a customer as it pertains to a particular product""" + + segment_prefix = "" if num_subscriptions == 1 else "re-" + if latest.status == "active": + if latest.canceled_at: segment = "cancelling" - changed = latest["canceled_at"] else: segment = "active" - changed = latest["start"] - elif latest["status"] == "canceled": + elif latest.status == "canceled": segment = "canceled" - changed = latest["ended_at"] else: segment_prefix = "" segment = "other" - changed = latest["created"] + return ProductSegmentEnum(segment_prefix + segment) + + +def _determine_changed(latest: "StripeSubscription") -> datetime: + if latest.status == "active": + if latest.canceled_at: + changed = latest.canceled_at + else: + changed = latest.start_date + elif latest.status == "canceled": + changed = latest.ended_at + else: + changed = latest.stripe_created assert changed - latest.update( - { - "sub_count": len(subscriptions_by_product), - "segment": f"{segment_prefix}{segment}", - "changed": changed, - } + return cast(datetime, changed) + + +def _product_metadata(product_subscription_items: list["StripeSubscriptionItem"]): + """Generate metadata about a Stripe product as it pertains to a Stripe customer. + + We use the latest subscription item that relates to a particular stripe product to + generate metadata concerning a customer's relationship to that product. + """ + + latest = max( + product_subscription_items, + key=lambda sub_item: cast(datetime, sub_item.subscription.current_period_end), + ) + return ProductBaseSchema( + payment_service="stripe", + ### + # These come from the Payment Method, not imported from Stripe. + payment_type=None, + card_brand=None, + card_last4=None, + billing_country=None, + ### + status=latest.subscription.status, + created=latest.subscription.stripe_created, + start=latest.subscription.start_date, + current_period_start=latest.subscription.current_period_start, + current_period_end=latest.subscription.current_period_end, + canceled_at=latest.subscription.canceled_at, + cancel_at_period_end=latest.subscription.cancel_at_period_end, + ended_at=latest.subscription.ended_at, + product_id=latest.price.stripe_product_id, + product_name=None, # Products are not imported + price_id=latest.price.stripe_id, + currency=latest.price.currency, + amount=latest.price.unit_amount, + interval_count=latest.price.recurring_interval_count, + interval=latest.price.recurring_interval, + sub_count=len(product_subscription_items), + segment=_determine_segment( + latest.subscription, len(product_subscription_items) + ), + changed=_determine_changed(latest.subscription), ) - return ProductBaseSchema(**latest) def get_stripe_products(email: "Email") -> List[ProductBaseSchema]: """Return a list of Stripe products for the contact, if any.""" if not email.stripe_customer: return [] - subscription_metadata_by_product = _subscriptions_by_product( + sub_items_by_product = _subscription_items_by_product( email.stripe_customer.subscriptions ) products = [ - _product_metadata(subscriptions) - for subscriptions in subscription_metadata_by_product.values() + _product_metadata(product_subscription_items) + for product_subscription_items in sub_items_by_product.values() ] products.sort(key=lambda prod: prod.product_id or "") return products