diff --git a/core/notifications/migrations/20240321091909_create_email_reminder_projection.up.sql b/core/notifications/migrations/20240321091909_create_email_reminder_projection.up.sql index 5adeb8ae772..ab66edd8da5 100644 --- a/core/notifications/migrations/20240321091909_create_email_reminder_projection.up.sql +++ b/core/notifications/migrations/20240321091909_create_email_reminder_projection.up.sql @@ -1,7 +1,7 @@ CREATE TABLE email_reminder_projection ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), galoy_user_id VARCHAR UNIQUE NOT NULL, - user_created_at TIMESTAMPTZ NOT NULL, - last_activity_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - last_transaction_at TIMESTAMPTZ + user_first_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_transaction_at TIMESTAMPTZ, + last_notified_at TIMESTAMPTZ ); diff --git a/core/notifications/src/app/mod.rs b/core/notifications/src/app/mod.rs index 162388ec0b0..d3993280590 100644 --- a/core/notifications/src/app/mod.rs +++ b/core/notifications/src/app/mod.rs @@ -8,8 +8,13 @@ use tracing::instrument; use std::sync::Arc; use crate::{ - email_executor::EmailExecutor, email_reminder_projection::EmailReminderProjection, job, - notification_cool_off_tracker::*, notification_event::*, primitives::*, push_executor::*, + email_executor::EmailExecutor, + email_reminder_projection::EmailReminderProjection, + job::{self}, + notification_cool_off_tracker::*, + notification_event::*, + primitives::*, + push_executor::*, user_notification_settings::*, }; @@ -147,21 +152,7 @@ impl NotificationsApp { ) -> Result { let mut user_settings = self.settings.find_for_user_id(&user_id).await?; user_settings.add_push_device_token(device_token); - let new_user = user_settings.is_new(); - let mut tx = self.pool.begin().await?; - self.settings - .persist_in_tx(&mut tx, &mut user_settings) - .await?; - if new_user { - self.email_reminder_projection - .new_user_without_email( - &mut tx, - user_id, - user_settings.created_at().expect("already persisted"), - ) - .await?; - } - tx.commit().await?; + self.settings.persist(&mut user_settings).await?; Ok(user_settings) } @@ -185,7 +176,14 @@ impl NotificationsApp { ) -> Result<(), ApplicationError> { let mut user_settings = self.settings.find_for_user_id(&user_id).await?; user_settings.update_email_address(addr); - self.settings.persist(&mut user_settings).await?; + let mut tx = self.pool.begin().await?; + self.settings + .persist_in_tx(&mut tx, &mut user_settings) + .await?; + self.email_reminder_projection + .user_added_email(&mut tx, user_id) + .await?; + tx.commit().await?; Ok(()) } @@ -216,6 +214,29 @@ impl NotificationsApp { Ok(()) } + #[instrument(name = "app.handle_transaction_occurred_event", skip(self), err)] + pub async fn handle_transaction_occurred_event( + &self, + user_id: GaloyUserId, + transaction_occurred: TransactionOccurred, + ) -> Result<(), ApplicationError> { + let payload = NotificationEventPayload::from(transaction_occurred); + let mut tx = self.pool.begin().await?; + if payload.should_send_email() { + job::spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone())).await?; + } + job::spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone())).await?; + + let user_settings = self.settings.find_for_user_id(&user_id).await?; + if user_settings.email_address().is_none() { + self.email_reminder_projection + .transaction_occurred_for_user_without_email(&mut tx, user_id) + .await?; + } + tx.commit().await?; + Ok(()) + } + #[instrument(name = "app.handle_price_changed_event", skip(self), err)] pub async fn handle_price_changed_event( &self, @@ -268,11 +289,7 @@ impl NotificationsApp { ) -> Result<(), ApplicationError> { tokio::spawn(async move { loop { - let _ = job::spawn_link_email_reminder( - &pool, - NotificationEventPayload::from(LinkEmailReminder {}), - ) - .await; + let _ = job::spawn_link_email_reminder(&pool, ()).await; tokio::time::sleep(delay).await; } }); diff --git a/core/notifications/src/email_reminder_projection/mod.rs b/core/notifications/src/email_reminder_projection/mod.rs index bf87a970902..0f9dcb4b497 100644 --- a/core/notifications/src/email_reminder_projection/mod.rs +++ b/core/notifications/src/email_reminder_projection/mod.rs @@ -13,7 +13,13 @@ pub struct EmailReminderProjection { _pool: PgPool, } +const PAGINATION_BATCH_SIZE: i64 = 1000; + impl EmailReminderProjection { + const ACCOUNT_LIVENESS_THRESHOLD_DAYS: i64 = 21; + const ACCOUNT_AGED_THRESHOLD_DAYS: i64 = 21; + const NOTIFICATION_COOL_OFF_THRESHOLD_DAYS: i64 = 90; + pub fn new(pool: &PgPool) -> Self { Self { _pool: pool.clone(), @@ -24,16 +30,105 @@ impl EmailReminderProjection { &self, tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, galoy_user_id: GaloyUserId, - created_at: DateTime, + user_first_seen_at: DateTime, ) -> Result<(), EmailReminderProjectionError> { sqlx::query!( r#"INSERT INTO email_reminder_projection - (galoy_user_id, user_created_at) VALUES ($1, $2)"#, + (galoy_user_id, user_first_seen_at) VALUES ($1, $2)"#, galoy_user_id.as_ref(), - created_at + user_first_seen_at ) .execute(&mut **tx) .await?; Ok(()) } + + pub async fn transaction_occurred_for_user_without_email( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + galoy_user_id: GaloyUserId, + ) -> Result<(), EmailReminderProjectionError> { + sqlx::query!( + r#"INSERT INTO email_reminder_projection + (galoy_user_id, last_transaction_at) VALUES ($1, now()) + ON CONFLICT (galoy_user_id) DO UPDATE + SET last_transaction_at = now()"#, + galoy_user_id.as_ref(), + ) + .execute(&mut **tx) + .await?; + Ok(()) + } + + pub async fn user_added_email( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + galoy_user_id: GaloyUserId, + ) -> Result<(), EmailReminderProjectionError> { + sqlx::query!( + r#"DELETE FROM email_reminder_projection + WHERE galoy_user_id = $1"#, + galoy_user_id.as_ref() + ) + .execute(&mut **tx) + .await?; + Ok(()) + } + + pub async fn user_notified( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + galoy_user_id: GaloyUserId, + ) -> Result<(), EmailReminderProjectionError> { + // set last notified at to now + sqlx::query!( + r#"UPDATE email_reminder_projection + SET last_notified_at = now() + WHERE galoy_user_id = $1"#, + galoy_user_id.as_ref() + ) + .execute(&mut **tx) + .await?; + Ok(()) + } + + pub async fn list_ids_to_notify_after( + &self, + search_id: GaloyUserId, + ) -> Result<(Vec, bool), EmailReminderProjectionError> { + let last_transaction_at_threshold = Utc::now() + - Duration::try_days(Self::ACCOUNT_LIVENESS_THRESHOLD_DAYS) + .expect("Should be valid duration"); + let user_first_seen_at_threshold = Utc::now() + - Duration::try_days(Self::ACCOUNT_AGED_THRESHOLD_DAYS) + .expect("Should be valid duration"); + let last_notified_at_threshold = Utc::now() + - Duration::try_days(Self::NOTIFICATION_COOL_OFF_THRESHOLD_DAYS) + .expect("Should be valid duration"); + + let rows = sqlx::query!( + r#"SELECT galoy_user_id + FROM email_reminder_projection + WHERE galoy_user_id > $1 AND last_transaction_at IS NOT NULL AND last_transaction_at > $2 AND user_first_seen_at < $3 AND (last_notified_at IS NULL OR last_notified_at < $4) + ORDER BY galoy_user_id + LIMIT $5"#, + search_id.as_ref(), + last_transaction_at_threshold, + user_first_seen_at_threshold, + last_notified_at_threshold, + PAGINATION_BATCH_SIZE + 1, + ) + .fetch_all(&self._pool) + .await?; + + let more = rows.len() > PAGINATION_BATCH_SIZE as usize; + + Ok(( + rows.into_iter() + .take(PAGINATION_BATCH_SIZE as usize) + .map(|r| GaloyUserId::from(r.galoy_user_id)) + .collect(), + more, + )) + } } diff --git a/core/notifications/src/grpc/server/mod.rs b/core/notifications/src/grpc/server/mod.rs index 011dd9de4f0..6b50f26f720 100644 --- a/core/notifications/src/grpc/server/mod.rs +++ b/core/notifications/src/grpc/server/mod.rs @@ -393,7 +393,7 @@ impl NotificationsService for Notifications { .map_err(|e| Status::invalid_argument(e.to_string()))?; let user_id = GaloyUserId::from(user_id); self.app - .handle_single_user_event( + .handle_transaction_occurred_event( user_id, notification_event::TransactionOccurred { transaction_type, diff --git a/core/notifications/src/job/mod.rs b/core/notifications/src/job/mod.rs index e7533dce372..363d1dcfd8e 100644 --- a/core/notifications/src/job/mod.rs +++ b/core/notifications/src/job/mod.rs @@ -98,24 +98,27 @@ async fn link_email_reminder( .execute(|data| async move { let data: LinkEmailReminderData = data.expect("no LinkEmailReminderData available"); let (ids, more) = email_reminder_projection - .list_ids_after(&data.search_id) this needs to be replaced with new query + .list_ids_to_notify_after(data.search_id.clone()) .await?; let mut tx = pool.begin().await?; + if more { let data = LinkEmailReminderData { search_id: ids.last().expect("there should always be an id").clone(), - payload: data.payload.clone(), tracing_data: tracing::extract_tracing_data(), }; spawn_link_email_reminder(&pool, data).await?; } for user_id in ids { - let payload = data.payload.clone(); + let payload = NotificationEventPayload::from(LinkEmailReminder {}); if payload.should_send_email() { spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone())) .await?; } - spawn_send_push_notification(&mut tx, (user_id, payload)).await?; + spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone())).await?; + email_reminder_projection + .user_notified(&mut tx, user_id.clone()) + .await?; } Ok::<_, JobError>(JobResult::CompleteWithTx(tx)) }) @@ -246,6 +249,7 @@ pub async fn spawn_link_email_reminder( data: impl Into, ) -> Result<(), JobError> { let data = data.into(); + // TODO: I think reusing the id here might cause a bug match JobBuilder::new_with_id(LINK_EMAIL_REMINDER_ID, "link_email_reminder") .set_channel_name("link_email_reminder") .set_json(&data) @@ -322,16 +326,14 @@ impl From for AllUserEventDispatchData { #[derive(Debug, Serialize, Deserialize)] pub(super) struct LinkEmailReminderData { search_id: GaloyUserId, - payload: NotificationEventPayload, #[serde(flatten)] pub(super) tracing_data: HashMap, } -impl From for LinkEmailReminderData { - fn from(payload: NotificationEventPayload) -> Self { +impl From<()> for LinkEmailReminderData { + fn from(_: ()) -> Self { Self { search_id: GaloyUserId::search_begin(), - payload, tracing_data: tracing::extract_tracing_data(), } }