diff --git a/Cargo.lock b/Cargo.lock index 538cda119c..c2520fb9df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -854,6 +854,7 @@ dependencies = [ name = "es-entity" version = "0.1.0" dependencies = [ + "chrono", "derive_builder", "serde", "serde_json", diff --git a/core/notifications/.sqlx/query-1f1b7039f03d2b42a88d13ee0394aac08c81a41e828eaa94a29e277177822fc6.json b/core/notifications/.sqlx/query-1f1b7039f03d2b42a88d13ee0394aac08c81a41e828eaa94a29e277177822fc6.json new file mode 100644 index 0000000000..fb93d2379d --- /dev/null +++ b/core/notifications/.sqlx/query-1f1b7039f03d2b42a88d13ee0394aac08c81a41e828eaa94a29e277177822fc6.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO email_reminder_projection\n (galoy_user_id, last_transaction_at) VALUES ($1, now())\n ON CONFLICT (galoy_user_id) DO UPDATE\n SET last_transaction_at = now()", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Varchar" + ] + }, + "nullable": [] + }, + "hash": "1f1b7039f03d2b42a88d13ee0394aac08c81a41e828eaa94a29e277177822fc6" +} diff --git a/core/notifications/.sqlx/query-38788b6bfb3d437147ea78bcada68610b5278cb86bd649474d4eb663e98f4497.json b/core/notifications/.sqlx/query-38788b6bfb3d437147ea78bcada68610b5278cb86bd649474d4eb663e98f4497.json deleted file mode 100644 index 0502a54422..0000000000 --- a/core/notifications/.sqlx/query-38788b6bfb3d437147ea78bcada68610b5278cb86bd649474d4eb663e98f4497.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT a.id, e.sequence, e.event\n FROM user_notification_settings a\n JOIN user_notification_settings_events e ON a.id = e.id\n WHERE a.galoy_user_id = $1\n ORDER BY e.sequence", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "sequence", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "event", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [ - "Text" - ] - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "38788b6bfb3d437147ea78bcada68610b5278cb86bd649474d4eb663e98f4497" -} diff --git a/core/notifications/.sqlx/query-47ede6d0750bde9e9f595f603ee7baf7ba337db2f1707805e8fa36f04ca0360e.json b/core/notifications/.sqlx/query-47ede6d0750bde9e9f595f603ee7baf7ba337db2f1707805e8fa36f04ca0360e.json new file mode 100644 index 0000000000..5033ef8361 --- /dev/null +++ b/core/notifications/.sqlx/query-47ede6d0750bde9e9f595f603ee7baf7ba337db2f1707805e8fa36f04ca0360e.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH selected_rows AS (\n SELECT galoy_user_id\n FROM email_reminder_projection\n WHERE galoy_user_id > $1\n AND last_transaction_at IS NOT NULL\n AND last_transaction_at > (NOW() - make_interval(mins => $2))\n AND user_first_seen_at < (NOW() - make_interval(mins => $3))\n AND (last_notified_at IS NULL OR last_notified_at < (NOW() - make_interval(mins => $4)))\n ORDER BY galoy_user_id\n LIMIT $5\n ),\n updated AS (\n UPDATE email_reminder_projection\n SET last_notified_at = NOW()\n FROM selected_rows\n WHERE email_reminder_projection.galoy_user_id = selected_rows.galoy_user_id\n RETURNING email_reminder_projection.galoy_user_id\n )\n SELECT galoy_user_id\n FROM updated\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "galoy_user_id", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Text", + "Int4", + "Int4", + "Int4", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "47ede6d0750bde9e9f595f603ee7baf7ba337db2f1707805e8fa36f04ca0360e" +} diff --git a/core/notifications/.sqlx/query-533ec09be1426e2330214ddb09a74e059574296dc8e7c5e6492e0d60c739935d.json b/core/notifications/.sqlx/query-533ec09be1426e2330214ddb09a74e059574296dc8e7c5e6492e0d60c739935d.json new file mode 100644 index 0000000000..be1a5482a9 --- /dev/null +++ b/core/notifications/.sqlx/query-533ec09be1426e2330214ddb09a74e059574296dc8e7c5e6492e0d60c739935d.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT a.id, e.sequence, e.event,\n a.created_at AS entity_created_at, e.recorded_at AS event_recorded_at\n FROM user_notification_settings a\n JOIN user_notification_settings_events e ON a.id = e.id\n WHERE a.galoy_user_id = $1\n ORDER BY e.sequence", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "sequence", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "event", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "entity_created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "event_recorded_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "533ec09be1426e2330214ddb09a74e059574296dc8e7c5e6492e0d60c739935d" +} diff --git a/core/notifications/.sqlx/query-baf183050fbbad59db8c7ee26f900b9e481bcf530c668e32b61ce8bf25a2a385.json b/core/notifications/.sqlx/query-baf183050fbbad59db8c7ee26f900b9e481bcf530c668e32b61ce8bf25a2a385.json new file mode 100644 index 0000000000..99cda8296c --- /dev/null +++ b/core/notifications/.sqlx/query-baf183050fbbad59db8c7ee26f900b9e481bcf530c668e32b61ce8bf25a2a385.json @@ -0,0 +1,47 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT a.id, e.sequence, e.event,\n a.created_at AS entity_created_at, e.recorded_at AS event_recorded_at\n FROM user_notification_settings a\n JOIN user_notification_settings_events e ON a.id = e.id\n WHERE galoy_user_id > $1\n ORDER BY galoy_user_id, e.sequence\n LIMIT $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "sequence", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "event", + "type_info": "Jsonb" + }, + { + "ordinal": 3, + "name": "entity_created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "event_recorded_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "baf183050fbbad59db8c7ee26f900b9e481bcf530c668e32b61ce8bf25a2a385" +} diff --git a/core/notifications/.sqlx/query-d05bfed34ba510df8ab1913377c36cdba1db9f4be114ba666008ad2732d479b5.json b/core/notifications/.sqlx/query-d05bfed34ba510df8ab1913377c36cdba1db9f4be114ba666008ad2732d479b5.json new file mode 100644 index 0000000000..bb6c4a47c8 --- /dev/null +++ b/core/notifications/.sqlx/query-d05bfed34ba510df8ab1913377c36cdba1db9f4be114ba666008ad2732d479b5.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM email_reminder_projection\n WHERE galoy_user_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "d05bfed34ba510df8ab1913377c36cdba1db9f4be114ba666008ad2732d479b5" +} diff --git a/core/notifications/locales/en.yml b/core/notifications/locales/en.yml index bf5bc96038..7affcb4a4d 100644 --- a/core/notifications/locales/en.yml +++ b/core/notifications/locales/en.yml @@ -60,3 +60,6 @@ transaction.paid_invoice.body_display_currency: "+%{displayCurrencyAmount} | %{f price_changed.title: "Bitcoin is on the move!" price_changed.body: "Bitcoin is up %{percent_increase}% in the last day to %{price}!" + +security.link_email_reminder.title: "Link Email to Secure Account" +security.link_email_reminder.body: "Link your email to secure your account and receive important updates" diff --git a/core/notifications/migrations/20240321091909_create_email_reminder_projection.down.sql b/core/notifications/migrations/20240321091909_create_email_reminder_projection.down.sql new file mode 100644 index 0000000000..d2f607c5b8 --- /dev/null +++ b/core/notifications/migrations/20240321091909_create_email_reminder_projection.down.sql @@ -0,0 +1 @@ +-- Add down migration script here diff --git a/core/notifications/migrations/20240321091909_create_email_reminder_projection.up.sql b/core/notifications/migrations/20240321091909_create_email_reminder_projection.up.sql new file mode 100644 index 0000000000..ab66edd8da --- /dev/null +++ b/core/notifications/migrations/20240321091909_create_email_reminder_projection.up.sql @@ -0,0 +1,7 @@ +CREATE TABLE email_reminder_projection ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + galoy_user_id VARCHAR UNIQUE NOT NULL, + user_first_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_transaction_at TIMESTAMPTZ, + last_notified_at TIMESTAMPTZ +); diff --git a/core/notifications/notifications.yml b/core/notifications/notifications.yml index 2804ec13ad..249d13f1ad 100644 --- a/core/notifications/notifications.yml +++ b/core/notifications/notifications.yml @@ -6,6 +6,12 @@ # grpc_server: # port: 6685 app: + # jobs: + # link_email_reminder_delay: 5 + # email_reminder_projection: + # account_liveness_threshold_minutes: 30240 + # account_aged_threshold_minutes: 30240 + # notification_cool_off_threshold_minutes: 129600 push_executor: fcm: google_application_credentials_path: "./config/notifications/fake_service_account.json" diff --git a/core/notifications/src/app/config.rs b/core/notifications/src/app/config.rs index 4f104bf790..ee3104913b 100644 --- a/core/notifications/src/app/config.rs +++ b/core/notifications/src/app/config.rs @@ -1,10 +1,17 @@ use serde::{Deserialize, Serialize}; -use crate::{email_executor::EmailExecutorConfig, push_executor::PushExecutorConfig}; +use crate::{ + email_executor::EmailExecutorConfig, email_reminder_projection::EmailReminderProjectionConfig, + job::JobsConfig, push_executor::PushExecutorConfig, +}; #[derive(Clone, Default, Serialize, Deserialize)] pub struct AppConfig { pub push_executor: PushExecutorConfig, #[serde(default)] pub email_executor: EmailExecutorConfig, + #[serde(default)] + pub jobs: JobsConfig, + #[serde(default)] + pub email_reminder_projection: EmailReminderProjectionConfig, } diff --git a/core/notifications/src/app/error.rs b/core/notifications/src/app/error.rs index 0b6cc8a60d..06be85fee8 100644 --- a/core/notifications/src/app/error.rs +++ b/core/notifications/src/app/error.rs @@ -1,7 +1,8 @@ use thiserror::Error; use crate::{ - email_executor::error::EmailExecutorError, job::error::JobError, + email_executor::error::EmailExecutorError, + email_reminder_projection::error::EmailReminderProjectionError, job::error::JobError, notification_cool_off_tracker::NotificationCoolOffTrackerError, push_executor::error::PushExecutorError, user_notification_settings::error::UserNotificationSettingsError, @@ -14,6 +15,8 @@ pub enum ApplicationError { #[error("{0}")] UserNotificationSettingsError(#[from] UserNotificationSettingsError), #[error("{0}")] + EmailReminderProjectionError(#[from] EmailReminderProjectionError), + #[error("{0}")] JobError(#[from] JobError), #[error("{0}")] PushExecutorError(#[from] PushExecutorError), diff --git a/core/notifications/src/app/mod.rs b/core/notifications/src/app/mod.rs index c3c3dfde8e..0193b4ac0c 100644 --- a/core/notifications/src/app/mod.rs +++ b/core/notifications/src/app/mod.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use crate::{ email_executor::EmailExecutor, + email_reminder_projection::EmailReminderProjection, job::{self}, notification_cool_off_tracker::*, notification_event::*, @@ -24,6 +25,7 @@ use error::*; pub struct NotificationsApp { _config: AppConfig, settings: UserNotificationSettingsRepo, + email_reminder_projection: EmailReminderProjection, pool: Pool, _runner: Arc, } @@ -34,12 +36,26 @@ impl NotificationsApp { let push_executor = PushExecutor::init(config.push_executor.clone(), settings.clone()).await?; let email_executor = EmailExecutor::init(config.email_executor.clone(), settings.clone())?; - let runner = - job::start_job_runner(&pool, push_executor, email_executor, settings.clone()).await?; + let email_reminder_projection = + EmailReminderProjection::new(&pool, config.email_reminder_projection.clone()); + let runner = job::start_job_runner( + &pool, + push_executor, + email_executor, + settings.clone(), + email_reminder_projection.clone(), + ) + .await?; + Self::spawn_kickoff_link_email_reminder( + pool.clone(), + config.jobs.kickoff_link_email_remainder_delay, + ) + .await?; Ok(Self { _config: config, pool, settings, + email_reminder_projection, _runner: Arc::new(runner), }) } @@ -164,7 +180,14 @@ impl NotificationsApp { ) -> Result<(), ApplicationError> { let mut user_settings = self.settings.find_for_user_id(&user_id).await?; user_settings.update_email_address(addr); - self.settings.persist(&mut user_settings).await?; + let mut tx = self.pool.begin().await?; + self.settings + .persist_in_tx(&mut tx, &mut user_settings) + .await?; + self.email_reminder_projection + .user_added_email(&mut tx, user_id) + .await?; + tx.commit().await?; Ok(()) } @@ -195,6 +218,22 @@ impl NotificationsApp { Ok(()) } + #[instrument(name = "app.handle_transaction_occurred_event", skip(self), err)] + pub async fn handle_transaction_occurred_event( + &self, + user_id: GaloyUserId, + transaction_occurred: TransactionOccurred, + ) -> Result<(), ApplicationError> { + let user_settings = self.settings.find_for_user_id(&user_id).await?; + if user_settings.email_address().is_none() { + self.email_reminder_projection + .transaction_occurred_for_user_without_email(&user_id) + .await?; + } + self.handle_single_user_event(user_id, transaction_occurred) + .await + } + #[instrument(name = "app.handle_price_changed_event", skip(self), err)] pub async fn handle_price_changed_event( &self, @@ -239,4 +278,27 @@ impl NotificationsApp { tx.commit().await?; Ok(()) } + + #[instrument( + name = "app.kickoff_link_email_reminder", + level = "trace", + skip_all, + err + )] + async fn spawn_kickoff_link_email_reminder( + pool: sqlx::PgPool, + delay: std::time::Duration, + ) -> Result<(), ApplicationError> { + tokio::spawn(async move { + loop { + let _ = job::spawn_kickoff_link_email_reminder( + &pool, + std::time::Duration::from_secs(1), + ) + .await; + tokio::time::sleep(delay).await; + } + }); + Ok(()) + } } diff --git a/core/notifications/src/email_reminder_projection/config.rs b/core/notifications/src/email_reminder_projection/config.rs new file mode 100644 index 0000000000..1edcb1b0a1 --- /dev/null +++ b/core/notifications/src/email_reminder_projection/config.rs @@ -0,0 +1,34 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct EmailReminderProjectionConfig { + #[serde(default = "default_account_liveness_threshold_minutes")] + pub account_liveness_threshold_minutes: i32, + #[serde(default = "default_account_aged_threshold_minutes")] + pub account_aged_threshold_minutes: i32, + #[serde(default = "default_notification_cool_off_threshold_minutes")] + pub notification_cool_off_threshold_minutes: i32, +} + +impl Default for EmailReminderProjectionConfig { + fn default() -> Self { + Self { + account_liveness_threshold_minutes: default_account_liveness_threshold_minutes(), + account_aged_threshold_minutes: default_account_aged_threshold_minutes(), + notification_cool_off_threshold_minutes: + default_notification_cool_off_threshold_minutes(), + } + } +} + +fn default_account_liveness_threshold_minutes() -> i32 { + 21 * 24 * 60 // 21 days +} + +fn default_account_aged_threshold_minutes() -> i32 { + 21 * 24 * 60 // 21 days +} + +fn default_notification_cool_off_threshold_minutes() -> i32 { + 90 * 24 * 60 // 90 days +} diff --git a/core/notifications/src/email_reminder_projection/error.rs b/core/notifications/src/email_reminder_projection/error.rs new file mode 100644 index 0000000000..8447847fef --- /dev/null +++ b/core/notifications/src/email_reminder_projection/error.rs @@ -0,0 +1,11 @@ +use thiserror::Error; + +use crate::user_notification_settings::error::*; + +#[derive(Debug, Error)] +pub enum EmailReminderProjectionError { + #[error("EmailReminderProjection - Sqlx: {0}")] + Sqlx(#[from] sqlx::Error), + #[error("EmailReminderProjection - UserNotificationSettings: {0}")] + UserNotificationSettings(#[from] UserNotificationSettingsError), +} diff --git a/core/notifications/src/email_reminder_projection/mod.rs b/core/notifications/src/email_reminder_projection/mod.rs new file mode 100644 index 0000000000..18b955a4b3 --- /dev/null +++ b/core/notifications/src/email_reminder_projection/mod.rs @@ -0,0 +1,104 @@ +mod config; +pub mod error; + +use sqlx::PgPool; + +use crate::primitives::*; + +pub use config::*; +use error::*; + +#[derive(Debug, Clone)] +pub struct EmailReminderProjection { + pool: PgPool, + config: EmailReminderProjectionConfig, +} + +const PAGINATION_BATCH_SIZE: i64 = 1000; + +impl EmailReminderProjection { + pub fn new(pool: &PgPool, config: EmailReminderProjectionConfig) -> Self { + Self { + pool: pool.clone(), + config, + } + } + + pub async fn transaction_occurred_for_user_without_email( + &self, + galoy_user_id: &GaloyUserId, + ) -> Result<(), EmailReminderProjectionError> { + sqlx::query!( + r#"INSERT INTO email_reminder_projection + (galoy_user_id, last_transaction_at) VALUES ($1, now()) + ON CONFLICT (galoy_user_id) DO UPDATE + SET last_transaction_at = now()"#, + galoy_user_id.as_ref(), + ) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn user_added_email( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + galoy_user_id: GaloyUserId, + ) -> Result<(), EmailReminderProjectionError> { + sqlx::query!( + r#"DELETE FROM email_reminder_projection + WHERE galoy_user_id = $1"#, + galoy_user_id.as_ref() + ) + .execute(&mut **tx) + .await?; + Ok(()) + } + + pub async fn list_ids_to_notify_after( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + search_id: GaloyUserId, + ) -> Result<(Vec, bool), EmailReminderProjectionError> { + let rows = sqlx::query!( + r#"WITH selected_rows AS ( + SELECT galoy_user_id + FROM email_reminder_projection + WHERE galoy_user_id > $1 + AND last_transaction_at IS NOT NULL + AND last_transaction_at > (NOW() - make_interval(mins => $2)) + AND user_first_seen_at < (NOW() - make_interval(mins => $3)) + AND (last_notified_at IS NULL OR last_notified_at < (NOW() - make_interval(mins => $4))) + ORDER BY galoy_user_id + LIMIT $5 + ), + updated AS ( + UPDATE email_reminder_projection + SET last_notified_at = NOW() + FROM selected_rows + WHERE email_reminder_projection.galoy_user_id = selected_rows.galoy_user_id + RETURNING email_reminder_projection.galoy_user_id + ) + SELECT galoy_user_id + FROM updated + "#, + search_id.as_ref(), + self.config.account_liveness_threshold_minutes, + self.config.account_aged_threshold_minutes, + self.config.notification_cool_off_threshold_minutes, + PAGINATION_BATCH_SIZE + 1, + ) + .fetch_all(&mut **tx) + .await?; + + let more = rows.len() > PAGINATION_BATCH_SIZE as usize; + + Ok(( + rows.into_iter() + .take(PAGINATION_BATCH_SIZE as usize) + .map(|r| GaloyUserId::from(r.galoy_user_id)) + .collect(), + more, + )) + } +} diff --git a/core/notifications/src/grpc/server/convert.rs b/core/notifications/src/grpc/server/convert.rs index 0dc24059d7..0d96b9a487 100644 --- a/core/notifications/src/grpc/server/convert.rs +++ b/core/notifications/src/grpc/server/convert.rs @@ -74,6 +74,9 @@ impl From for proto::NotificationCategory { } UserNotificationCategory::Marketing => proto::NotificationCategory::Marketing, UserNotificationCategory::Price => proto::NotificationCategory::Price, + UserNotificationCategory::Security => { + unreachable!("should never match security to grpc category") + } } } } diff --git a/core/notifications/src/grpc/server/mod.rs b/core/notifications/src/grpc/server/mod.rs index 011dd9de4f..6b50f26f72 100644 --- a/core/notifications/src/grpc/server/mod.rs +++ b/core/notifications/src/grpc/server/mod.rs @@ -393,7 +393,7 @@ impl NotificationsService for Notifications { .map_err(|e| Status::invalid_argument(e.to_string()))?; let user_id = GaloyUserId::from(user_id); self.app - .handle_single_user_event( + .handle_transaction_occurred_event( user_id, notification_event::TransactionOccurred { transaction_type, diff --git a/core/notifications/src/job/config.rs b/core/notifications/src/job/config.rs new file mode 100644 index 0000000000..a6602ab086 --- /dev/null +++ b/core/notifications/src/job/config.rs @@ -0,0 +1,22 @@ +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde_with::serde_as] +pub struct JobsConfig { + #[serde_as(as = "serde_with::DurationSeconds")] + #[serde(default = "default_kickoff_link_email_reminder_delay")] + pub kickoff_link_email_remainder_delay: Duration, +} + +impl Default for JobsConfig { + fn default() -> Self { + Self { + kickoff_link_email_remainder_delay: default_kickoff_link_email_reminder_delay(), + } + } +} + +fn default_kickoff_link_email_reminder_delay() -> Duration { + Duration::from_secs(60 * 60 * 6) // Every 6 hours +} diff --git a/core/notifications/src/job/error.rs b/core/notifications/src/job/error.rs index c291cac807..fc2178e08c 100644 --- a/core/notifications/src/job/error.rs +++ b/core/notifications/src/job/error.rs @@ -1,8 +1,10 @@ use thiserror::Error; use crate::{ - email_executor::error::EmailExecutorError, push_executor::error::PushExecutorError, - user_notification_settings::error::*, + email_executor::error::EmailExecutorError, + email_reminder_projection::error::EmailReminderProjectionError, + push_executor::error::PushExecutorError, + user_notification_settings::error::UserNotificationSettingsError, }; #[derive(Error, Debug)] @@ -15,6 +17,8 @@ pub enum JobError { PushExecutor(#[from] PushExecutorError), #[error("JobError - EmailExecutorError: {0}")] EmailExecutor(#[from] EmailExecutorError), + #[error("JobError - EmailReminderProjection: {0}")] + EmailReminderProjection(#[from] EmailReminderProjectionError), } impl job_executor::JobExecutionError for JobError {} diff --git a/core/notifications/src/job/mod.rs b/core/notifications/src/job/mod.rs index 9d4d6a0cf3..b46d7b34c7 100644 --- a/core/notifications/src/job/mod.rs +++ b/core/notifications/src/job/mod.rs @@ -1,41 +1,49 @@ +mod config; mod send_email_notification; mod send_push_notification; pub mod error; use serde::{Deserialize, Serialize}; -use sqlxmq::{job, CurrentJob, JobRegistry, JobRunnerHandle}; +use sqlxmq::{job, CurrentJob, JobBuilder, JobRegistry, JobRunnerHandle}; use tracing::instrument; +use uuid::{uuid, Uuid}; use std::collections::HashMap; use job_executor::{JobExecutor, JobResult}; use crate::{ - email_executor::EmailExecutor, notification_event::*, primitives::GaloyUserId, - push_executor::PushExecutor, user_notification_settings::*, + email_executor::EmailExecutor, email_reminder_projection::*, notification_event::*, + primitives::GaloyUserId, push_executor::PushExecutor, user_notification_settings::*, }; +pub use config::*; use error::JobError; - use send_email_notification::SendEmailNotificationData; use send_push_notification::SendPushNotificationData; +const KICKOFF_LINK_EMAIL_REMINDER_ID: Uuid = uuid!("00000000-0000-0000-0000-000000000001"); + pub async fn start_job_runner( pool: &sqlx::PgPool, push_executor: PushExecutor, email_executor: EmailExecutor, settings: UserNotificationSettingsRepo, + email_reminder_projection: EmailReminderProjection, ) -> Result { let mut registry = JobRegistry::new(&[ all_user_event_dispatch, send_push_notification, send_email_notification, multi_user_event_dispatch, + kickoff_link_email_reminder, + link_email_reminder, ]); registry.set_context(push_executor); registry.set_context(email_executor); registry.set_context(settings); + registry.set_context(email_reminder_projection); Ok(registry.runner(pool).set_keep_alive(false).run().await?) } @@ -79,6 +87,72 @@ async fn all_user_event_dispatch( Ok(()) } +#[job(name = "link_email_reminder", channel_name = "link_email_reminder")] +async fn link_email_reminder( + mut current_job: CurrentJob, + email_reminder_projection: EmailReminderProjection, +) -> Result<(), JobError> { + let pool = current_job.pool().clone(); + JobExecutor::builder(&mut current_job) + .build() + .expect("couldn't build JobExecutor") + .execute(|data| async move { + let data: LinkEmailReminderData = data.expect("no LinkEmailReminderData available"); + let mut tx = pool.begin().await?; + let (ids, more) = email_reminder_projection + .list_ids_to_notify_after(&mut tx, data.search_id.clone()) + .await?; + + if more { + let data = LinkEmailReminderData { + search_id: ids.last().expect("there should always be an id").clone(), + tracing_data: tracing::extract_tracing_data(), + }; + spawn_link_email_reminder(&mut tx, data).await?; + } + for user_id in ids { + let payload = NotificationEventPayload::from(LinkEmailReminder {}); + if payload.should_send_email() { + spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone())) + .await?; + } + spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone())).await?; + } + Ok::<_, JobError>(JobResult::CompleteWithTx(tx)) + }) + .await?; + Ok(()) +} + +#[job( + name = "kickoff_link_email_reminder", + channel_name = "link_email_reminder" +)] +async fn kickoff_link_email_reminder( + mut current_job: CurrentJob, + JobsConfig { + kickoff_link_email_remainder_delay, + }: JobsConfig, +) -> Result<(), JobError> { + let pool = current_job.pool().clone(); + JobExecutor::builder(&mut current_job) + .build() + .expect("couldn't build JobExecutor") + .execute(|_: Option<()>| async move { + let data = LinkEmailReminderData { + search_id: GaloyUserId::search_begin(), + tracing_data: tracing::extract_tracing_data(), + }; + let mut tx = pool.begin().await?; + spawn_link_email_reminder(&mut tx, data).await?; + Ok::<_, JobError>(JobResult::CompleteWithTx(tx)) + }) + .await?; + spawn_kickoff_link_email_reminder(current_job.pool(), kickoff_link_email_remainder_delay) + .await?; + Ok(()) +} + #[job( name = "multi_user_event_dispatch", channel_name = "multi_user_event_dispatch" @@ -196,6 +270,48 @@ pub async fn spawn_multi_user_event_dispatch( Ok(()) } +#[instrument(name = "job.spawn_link_email_reminder", skip_all, fields(error, error.level, error.message), err)] +pub async fn spawn_link_email_reminder( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + data: impl Into, +) -> Result<(), JobError> { + let data = data.into(); + if let Err(e) = link_email_reminder + .builder() + .set_json(&data) + .expect("Couldn't set json") + .spawn(&mut **tx) + .await + { + tracing::insert_error_fields(tracing::Level::WARN, &e); + return Err(e.into()); + } + Ok(()) +} + +#[instrument(name = "job.spawn_kickoff_link_email_reminder", skip_all, fields(error, error.level, error.message), err)] +pub async fn spawn_kickoff_link_email_reminder( + pool: &sqlx::PgPool, + duration: std::time::Duration, +) -> Result<(), JobError> { + match JobBuilder::new_with_id( + KICKOFF_LINK_EMAIL_REMINDER_ID, + "kickoff_link_email_reminder", + ) + .set_channel_name("link_email_reminder") + .set_delay(duration) + .spawn(pool) + .await + { + Err(sqlx::Error::Database(err)) if err.message().contains("duplicate key") => Ok(()), + Err(e) => { + tracing::insert_error_fields(tracing::Level::ERROR, &e); + Err(e.into()) + } + Ok(_) => Ok(()), + } +} + #[job( name = "send_email_notification", channel_name = "send_email_notification" @@ -246,13 +362,29 @@ pub(super) struct AllUserEventDispatchData { impl From for AllUserEventDispatchData { fn from(payload: NotificationEventPayload) -> Self { Self { - search_id: GaloyUserId::from(String::new()), + search_id: GaloyUserId::search_begin(), payload, tracing_data: tracing::extract_tracing_data(), } } } +#[derive(Debug, Serialize, Deserialize)] +pub(super) struct LinkEmailReminderData { + search_id: GaloyUserId, + #[serde(flatten)] + pub(super) tracing_data: HashMap, +} + +impl From<()> for LinkEmailReminderData { + fn from(_: ()) -> Self { + Self { + search_id: GaloyUserId::search_begin(), + tracing_data: tracing::extract_tracing_data(), + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub(super) struct MultiUserEventDispatchData { user_ids: Vec, diff --git a/core/notifications/src/lib.rs b/core/notifications/src/lib.rs index 45e2e325fc..39a8677b15 100644 --- a/core/notifications/src/lib.rs +++ b/core/notifications/src/lib.rs @@ -10,6 +10,7 @@ mod messages; pub mod cli; pub mod email_executor; +pub mod email_reminder_projection; pub mod graphql; pub mod grpc; pub mod notification_cool_off_tracker; diff --git a/core/notifications/src/notification_event/link_email_reminder.rs b/core/notifications/src/notification_event/link_email_reminder.rs new file mode 100644 index 0000000000..d88e216962 --- /dev/null +++ b/core/notifications/src/notification_event/link_email_reminder.rs @@ -0,0 +1,56 @@ +use rust_i18n::t; +use serde::{Deserialize, Serialize}; + +use super::{DeepLink, NotificationEvent}; +use crate::{messages::*, primitives::*}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct LinkEmailReminder {} + +impl NotificationEvent for LinkEmailReminder { + fn category(&self) -> UserNotificationCategory { + UserNotificationCategory::Security + } + + fn deep_link(&self) -> Option { + None + } + + fn to_localized_push_msg(&self, locale: GaloyLocale) -> LocalizedPushMessage { + let title = t!( + "security.link_email_reminder.title", + locale = locale.as_ref() + ) + .to_string(); + let body = t!( + "security.link_email_reminder.body", + locale = locale.as_ref(), + ) + .to_string(); + LocalizedPushMessage { title, body } + } + + fn to_localized_email(&self, _locale: GaloyLocale) -> Option { + None + } + + fn should_send_email(&self) -> bool { + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_msg_correctly_formatted() { + let event = LinkEmailReminder {}; + let localized_message = event.to_localized_push_msg(GaloyLocale::from("en".to_string())); + assert_eq!(localized_message.title, "Link Email to Secure Account"); + assert_eq!( + localized_message.body, + "Link your email to secure your account and receive important updates" + ); + } +} diff --git a/core/notifications/src/notification_event/mod.rs b/core/notifications/src/notification_event/mod.rs index de1d835628..66f7ef5ad2 100644 --- a/core/notifications/src/notification_event/mod.rs +++ b/core/notifications/src/notification_event/mod.rs @@ -3,6 +3,7 @@ mod circle_threshold_reached; mod identity_verification_approved; mod identity_verification_declined; mod identity_verification_review_started; +mod link_email_reminder; mod marketing_notification_triggered; mod price_changed; mod transaction_occurred; @@ -16,6 +17,7 @@ pub(super) use circle_threshold_reached::*; pub(super) use identity_verification_approved::*; pub(super) use identity_verification_declined::*; pub(super) use identity_verification_review_started::*; +pub(super) use link_email_reminder::*; pub(super) use marketing_notification_triggered::*; pub(super) use price_changed::*; pub(super) use transaction_occurred::*; @@ -48,6 +50,7 @@ pub enum NotificationEventPayload { TransactionOccurred(TransactionOccurred), PriceChanged(PriceChanged), MarketingNotificationTriggered(MarketingNotificationTriggered), + LinkEmailReminder(LinkEmailReminder), } impl AsRef for NotificationEventPayload { @@ -61,6 +64,7 @@ impl AsRef for NotificationEventPayload { NotificationEventPayload::TransactionOccurred(event) => event, NotificationEventPayload::PriceChanged(event) => event, NotificationEventPayload::MarketingNotificationTriggered(event) => event, + NotificationEventPayload::LinkEmailReminder(event) => event, } } } @@ -120,3 +124,9 @@ impl From for NotificationEventPayload { NotificationEventPayload::MarketingNotificationTriggered(event) } } + +impl From for NotificationEventPayload { + fn from(event: LinkEmailReminder) -> Self { + NotificationEventPayload::LinkEmailReminder(event) + } +} diff --git a/core/notifications/src/primitives.rs b/core/notifications/src/primitives.rs index 3de8917347..139aa2b839 100644 --- a/core/notifications/src/primitives.rs +++ b/core/notifications/src/primitives.rs @@ -3,6 +3,10 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct GaloyUserId(String); impl GaloyUserId { + pub fn search_begin() -> Self { + GaloyUserId(String::new()) + } + pub fn into_inner(self) -> String { self.0 } @@ -120,6 +124,7 @@ pub enum UserNotificationCategory { AdminNotification, Marketing, Price, + Security, } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/core/notifications/src/user_notification_settings/entity.rs b/core/notifications/src/user_notification_settings/entity.rs index d742449692..9640e4e5ac 100644 --- a/core/notifications/src/user_notification_settings/entity.rs +++ b/core/notifications/src/user_notification_settings/entity.rs @@ -74,6 +74,14 @@ impl UserNotificationSettings { .expect("Could not create default") } + pub fn created_at(&self) -> Option> { + self.events.entity_first_persisted_at + } + + pub fn is_new(&self) -> bool { + self.created_at().is_none() + } + pub fn update_locale(&mut self, locale: GaloyLocale) { if self.locale().as_ref() != Some(&locale) { self.events diff --git a/core/notifications/src/user_notification_settings/repo.rs b/core/notifications/src/user_notification_settings/repo.rs index efcdfe7d46..211c15e78a 100644 --- a/core/notifications/src/user_notification_settings/repo.rs +++ b/core/notifications/src/user_notification_settings/repo.rs @@ -5,6 +5,8 @@ use es_entity::*; use super::{entity::*, error::*}; use crate::primitives::*; +const PAGINATION_BATCH_SIZE: i64 = 1000; + #[derive(Debug, Clone)] pub struct UserNotificationSettingsRepo { pool: PgPool, @@ -21,7 +23,8 @@ impl UserNotificationSettingsRepo { ) -> Result { let rows = sqlx::query_as!( GenericEvent, - r#"SELECT a.id, e.sequence, e.event + r#"SELECT a.id, e.sequence, e.event, + a.created_at AS entity_created_at, e.recorded_at AS event_recorded_at FROM user_notification_settings a JOIN user_notification_settings_events e ON a.id = e.id WHERE a.galoy_user_id = $1 @@ -38,21 +41,55 @@ impl UserNotificationSettingsRepo { Ok(res?) } + pub async fn list_after_id( + &self, + id: &GaloyUserId, + ) -> Result<(Vec, bool), UserNotificationSettingsError> { + let rows = sqlx::query_as!( + GenericEvent, + r#"SELECT a.id, e.sequence, e.event, + a.created_at AS entity_created_at, e.recorded_at AS event_recorded_at + FROM user_notification_settings a + JOIN user_notification_settings_events e ON a.id = e.id + WHERE galoy_user_id > $1 + ORDER BY galoy_user_id, e.sequence + LIMIT $2"#, + id.as_ref(), + PAGINATION_BATCH_SIZE + 1, + ) + .fetch_all(&self.pool) + .await?; + + Ok(EntityEvents::load_n::( + rows, + PAGINATION_BATCH_SIZE as usize, + )?) + } + pub async fn persist( &self, settings: &mut UserNotificationSettings, ) -> Result<(), UserNotificationSettingsError> { let mut tx = self.pool.begin().await?; + self.persist_in_tx(&mut tx, settings).await?; + tx.commit().await?; + Ok(()) + } + + pub async fn persist_in_tx( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + settings: &mut UserNotificationSettings, + ) -> Result<(), UserNotificationSettingsError> { sqlx::query!( r#"INSERT INTO user_notification_settings (id, galoy_user_id) VALUES ($1, $2) ON CONFLICT DO NOTHING"#, settings.id as UserNotificationSettingsId, settings.galoy_user_id.as_ref(), ) - .execute(&mut *tx) + .execute(&mut **tx) .await?; - settings.events.persist(&mut tx).await?; - tx.commit().await?; + settings.events.persist(tx).await?; Ok(()) } @@ -60,7 +97,6 @@ impl UserNotificationSettingsRepo { &self, id: &GaloyUserId, ) -> Result<(Vec, bool), UserNotificationSettingsError> { - let batch_limit = 1000; let rows = sqlx::query!( r#"SELECT galoy_user_id FROM user_notification_settings @@ -68,14 +104,14 @@ impl UserNotificationSettingsRepo { ORDER BY galoy_user_id LIMIT $2"#, id.as_ref(), - batch_limit as i64 + 1, + PAGINATION_BATCH_SIZE + 1, ) .fetch_all(&self.pool) .await?; - let more = rows.len() > batch_limit; + let more = rows.len() > PAGINATION_BATCH_SIZE as usize; Ok(( rows.into_iter() - .take(batch_limit) + .take(PAGINATION_BATCH_SIZE as usize) .map(|r| GaloyUserId::from(r.galoy_user_id)) .collect(), more, diff --git a/core/notifications/subgraph/schema.graphql b/core/notifications/subgraph/schema.graphql index c76c76ea9d..8bfebec82d 100644 --- a/core/notifications/subgraph/schema.graphql +++ b/core/notifications/subgraph/schema.graphql @@ -37,6 +37,7 @@ enum UserNotificationCategory { ADMIN_NOTIFICATION MARKETING PRICE + SECURITY } enum UserNotificationChannel { diff --git a/dev/config/apollo-federation/supergraph.graphql b/dev/config/apollo-federation/supergraph.graphql index cfc339bc7e..cbf5e51468 100644 --- a/dev/config/apollo-federation/supergraph.graphql +++ b/dev/config/apollo-federation/supergraph.graphql @@ -2107,6 +2107,7 @@ enum UserNotificationCategory ADMIN_NOTIFICATION @join__enumValue(graph: NOTIFICATIONS) MARKETING @join__enumValue(graph: NOTIFICATIONS) PRICE @join__enumValue(graph: NOTIFICATIONS) + SECURITY @join__enumValue(graph: NOTIFICATIONS) } enum UserNotificationChannel diff --git a/lib/es-entity-rs/BUCK b/lib/es-entity-rs/BUCK index 6eebc8f655..4efee9f15a 100644 --- a/lib/es-entity-rs/BUCK +++ b/lib/es-entity-rs/BUCK @@ -9,6 +9,7 @@ galoy_rust_lib( "//third-party/rust:serde", "//third-party/rust:sqlx", "//third-party/rust:uuid", + "//third-party/rust:chrono", ], srcs = glob(["src/**/*.rs"]), env = { diff --git a/lib/es-entity-rs/Cargo.toml b/lib/es-entity-rs/Cargo.toml index a911803e3d..3c77c068e8 100644 --- a/lib/es-entity-rs/Cargo.toml +++ b/lib/es-entity-rs/Cargo.toml @@ -14,3 +14,4 @@ serde_json = { workspace = true } serde = { workspace = true } uuid = { workspace = true } sqlx = { workspace = true } +chrono = { workspace = true } diff --git a/lib/es-entity-rs/src/events.rs b/lib/es-entity-rs/src/events.rs index 79562bbb2f..d90e07009f 100644 --- a/lib/es-entity-rs/src/events.rs +++ b/lib/es-entity-rs/src/events.rs @@ -1,4 +1,5 @@ use serde::{de::DeserializeOwned, Serialize}; +use sqlx::Row; use super::error::EntityError; @@ -7,6 +8,8 @@ pub struct GenericEvent { pub id: uuid::Uuid, pub sequence: i32, pub event: serde_json::Value, + pub entity_created_at: chrono::DateTime, + pub event_recorded_at: chrono::DateTime, } pub trait EntityEvent: DeserializeOwned + Serialize { @@ -22,6 +25,8 @@ pub trait EsEntity: TryFrom::Event>, Error = Ent } pub struct EntityEvents { + pub entity_first_persisted_at: Option>, + latest_event_persisted_at: Option>, entity_id: ::EntityId, persisted_events: Vec, new_events: Vec, @@ -36,6 +41,8 @@ where initial_events: impl IntoIterator, ) -> Self { Self { + entity_first_persisted_at: None, + latest_event_persisted_at: None, entity_id: id, persisted_events: Vec::new(), new_events: initial_events.into_iter().collect(), @@ -78,8 +85,20 @@ where builder.push_bind(event_type); builder.push_bind(event_json); }); + query_builder.push("RETURNING recorded_at"); let query = query_builder.build(); - query.execute(&mut **tx).await?; + + let rows = query.fetch_all(&mut **tx).await?; + + let recorded_at: chrono::DateTime = rows + .last() + .map(|row| row.get::, _>("recorded_at")) + .expect("Could not get recorded_at"); + + self.latest_event_persisted_at = Some(recorded_at); + if self.entity_first_persisted_at.is_none() { + self.entity_first_persisted_at = Some(recorded_at); + } self.persisted_events.extend(events); Ok(n_persisted) @@ -94,6 +113,8 @@ where if current_id.is_none() { current_id = Some(e.id); current = Some(Self { + entity_first_persisted_at: Some(e.entity_created_at), + latest_event_persisted_at: None, entity_id: e.id.into(), persisted_events: Vec::new(), new_events: Vec::new(), @@ -102,10 +123,9 @@ where if current_id != Some(e.id) { break; } - current - .as_mut() - .expect("Could not get current") - .persisted_events + let cur = current.as_mut().expect("Could not get current"); + cur.latest_event_persisted_at = Some(e.event_recorded_at); + cur.persisted_events .push(serde_json::from_value(e.event).expect("Could not deserialize event")); } if let Some(current) = current { @@ -115,6 +135,42 @@ where } } + pub fn load_n>( + events: impl IntoIterator, + n: usize, + ) -> Result<(Vec, bool), EntityError> { + let mut ret: Vec = Vec::new(); + let mut current_id = None; + let mut current = None; + for e in events { + if current_id != Some(e.id) { + if let Some(current) = current.take() { + ret.push(E::try_from(current)?); + if ret.len() == n { + return Ok((ret, true)); + } + } + + current_id = Some(e.id); + current = Some(Self { + entity_first_persisted_at: Some(e.entity_created_at), + latest_event_persisted_at: None, + entity_id: e.id.into(), + persisted_events: Vec::new(), + new_events: Vec::new(), + }); + } + let cur = current.as_mut().expect("Could not get current"); + cur.latest_event_persisted_at = Some(e.event_recorded_at); + cur.persisted_events + .push(serde_json::from_value(e.event).expect("Could not deserialize event")); + } + if let Some(current) = current.take() { + ret.push(E::try_from(current)?); + } + Ok((ret, false)) + } + pub fn iter(&self) -> impl DoubleEndedIterator { self.persisted_events.iter().chain(self.new_events.iter()) } @@ -181,8 +237,36 @@ mod tests { sequence: 1, event: serde_json::to_value(DummyEvent::Created("dummy-name".to_owned())) .expect("Could not serialize"), + entity_created_at: chrono::Utc::now(), + event_recorded_at: chrono::Utc::now(), }]; let entity: DummyEntity = EntityEvents::load_first(generic_events).expect("Could not load"); assert!(entity.name == "dummy-name"); } + + #[test] + fn load_n() { + let generic_events = vec![ + GenericEvent { + id: uuid::Uuid::new_v4(), + sequence: 1, + event: serde_json::to_value(DummyEvent::Created("dummy-name".to_owned())) + .expect("Could not serialize"), + entity_created_at: chrono::Utc::now(), + event_recorded_at: chrono::Utc::now(), + }, + GenericEvent { + id: uuid::Uuid::new_v4(), + sequence: 1, + event: serde_json::to_value(DummyEvent::Created("other-name".to_owned())) + .expect("Could not serialize"), + entity_created_at: chrono::Utc::now(), + event_recorded_at: chrono::Utc::now(), + }, + ]; + let (entity, more): (Vec, _) = + EntityEvents::load_n(generic_events, 2).expect("Could not load"); + assert!(!more); + assert_eq!(entity.len(), 2); + } }