Skip to content

Commit

Permalink
feat(notifictions): link email reminders
Browse files Browse the repository at this point in the history
  • Loading branch information
UncleSamtoshi committed Mar 21, 2024
1 parent 33968fc commit 2ae3397
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CREATE TABLE email_reminder_projection (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
galoy_user_id VARCHAR UNIQUE NOT NULL,
user_created_at TIMESTAMPTZ NOT NULL,
last_activity_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_transaction_at TIMESTAMPTZ
user_first_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_transaction_at TIMESTAMPTZ,
last_notified_at TIMESTAMPTZ
);
63 changes: 40 additions & 23 deletions core/notifications/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ use tracing::instrument;
use std::sync::Arc;

use crate::{
email_executor::EmailExecutor, email_reminder_projection::EmailReminderProjection, job,
notification_cool_off_tracker::*, notification_event::*, primitives::*, push_executor::*,
email_executor::EmailExecutor,
email_reminder_projection::EmailReminderProjection,
job::{self},
notification_cool_off_tracker::*,
notification_event::*,
primitives::*,
push_executor::*,
user_notification_settings::*,
};

Expand Down Expand Up @@ -147,21 +152,7 @@ impl NotificationsApp {
) -> Result<UserNotificationSettings, ApplicationError> {
let mut user_settings = self.settings.find_for_user_id(&user_id).await?;
user_settings.add_push_device_token(device_token);
let new_user = user_settings.is_new();
let mut tx = self.pool.begin().await?;
self.settings
.persist_in_tx(&mut tx, &mut user_settings)
.await?;
if new_user {
self.email_reminder_projection
.new_user_without_email(
&mut tx,
user_id,
user_settings.created_at().expect("already persisted"),
)
.await?;
}
tx.commit().await?;
self.settings.persist(&mut user_settings).await?;
Ok(user_settings)
}

Expand All @@ -185,7 +176,14 @@ impl NotificationsApp {
) -> Result<(), ApplicationError> {
let mut user_settings = self.settings.find_for_user_id(&user_id).await?;
user_settings.update_email_address(addr);
self.settings.persist(&mut user_settings).await?;
let mut tx = self.pool.begin().await?;
self.settings
.persist_in_tx(&mut tx, &mut user_settings)
.await?;
self.email_reminder_projection
.user_added_email(&mut tx, user_id)
.await?;
tx.commit().await?;
Ok(())
}

Expand Down Expand Up @@ -216,6 +214,29 @@ impl NotificationsApp {
Ok(())
}

#[instrument(name = "app.handle_transaction_occurred_event", skip(self), err)]
pub async fn handle_transaction_occurred_event(
&self,
user_id: GaloyUserId,
transaction_occurred: TransactionOccurred,
) -> Result<(), ApplicationError> {
let payload = NotificationEventPayload::from(transaction_occurred);
let mut tx = self.pool.begin().await?;
if payload.should_send_email() {
job::spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone())).await?;
}
job::spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone())).await?;

let user_settings = self.settings.find_for_user_id(&user_id).await?;
if user_settings.email_address().is_none() {
self.email_reminder_projection
.transaction_occurred_for_user_without_email(&mut tx, user_id)
.await?;
}
tx.commit().await?;
Ok(())
}

#[instrument(name = "app.handle_price_changed_event", skip(self), err)]
pub async fn handle_price_changed_event(
&self,
Expand Down Expand Up @@ -268,11 +289,7 @@ impl NotificationsApp {
) -> Result<(), ApplicationError> {
tokio::spawn(async move {
loop {
let _ = job::spawn_link_email_reminder(
&pool,
NotificationEventPayload::from(LinkEmailReminder {}),
)
.await;
let _ = job::spawn_link_email_reminder(&pool, ()).await;
tokio::time::sleep(delay).await;
}
});
Expand Down
101 changes: 98 additions & 3 deletions core/notifications/src/email_reminder_projection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ pub struct EmailReminderProjection {
_pool: PgPool,
}

const PAGINATION_BATCH_SIZE: i64 = 1000;

impl EmailReminderProjection {
const ACCOUNT_LIVENESS_THRESHOLD_DAYS: i64 = 21;
const ACCOUNT_AGED_THRESHOLD_DAYS: i64 = 21;
const NOTIFICATION_COOL_OFF_THRESHOLD_DAYS: i64 = 90;

pub fn new(pool: &PgPool) -> Self {
Self {
_pool: pool.clone(),
Expand All @@ -24,16 +30,105 @@ impl EmailReminderProjection {
&self,
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
galoy_user_id: GaloyUserId,
created_at: DateTime<Utc>,
user_first_seen_at: DateTime<Utc>,
) -> Result<(), EmailReminderProjectionError> {
sqlx::query!(
r#"INSERT INTO email_reminder_projection
(galoy_user_id, user_created_at) VALUES ($1, $2)"#,
(galoy_user_id, user_first_seen_at) VALUES ($1, $2)"#,
galoy_user_id.as_ref(),
created_at
user_first_seen_at
)
.execute(&mut **tx)
.await?;
Ok(())
}

pub async fn transaction_occurred_for_user_without_email(
&self,
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
galoy_user_id: GaloyUserId,
) -> Result<(), EmailReminderProjectionError> {
sqlx::query!(
r#"INSERT INTO email_reminder_projection
(galoy_user_id, last_transaction_at) VALUES ($1, now())
ON CONFLICT (galoy_user_id) DO UPDATE
SET last_transaction_at = now()"#,
galoy_user_id.as_ref(),
)
.execute(&mut **tx)
.await?;
Ok(())
}

pub async fn user_added_email(
&self,
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
galoy_user_id: GaloyUserId,
) -> Result<(), EmailReminderProjectionError> {
sqlx::query!(
r#"DELETE FROM email_reminder_projection
WHERE galoy_user_id = $1"#,
galoy_user_id.as_ref()
)
.execute(&mut **tx)
.await?;
Ok(())
}

pub async fn user_notified(
&self,
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
galoy_user_id: GaloyUserId,
) -> Result<(), EmailReminderProjectionError> {
// set last notified at to now
sqlx::query!(
r#"UPDATE email_reminder_projection
SET last_notified_at = now()
WHERE galoy_user_id = $1"#,
galoy_user_id.as_ref()
)
.execute(&mut **tx)
.await?;
Ok(())
}

pub async fn list_ids_to_notify_after(
&self,
search_id: GaloyUserId,
) -> Result<(Vec<GaloyUserId>, bool), EmailReminderProjectionError> {
let last_transaction_at_threshold = Utc::now()
- Duration::try_days(Self::ACCOUNT_LIVENESS_THRESHOLD_DAYS)
.expect("Should be valid duration");
let user_first_seen_at_threshold = Utc::now()
- Duration::try_days(Self::ACCOUNT_AGED_THRESHOLD_DAYS)
.expect("Should be valid duration");
let last_notified_at_threshold = Utc::now()
- Duration::try_days(Self::NOTIFICATION_COOL_OFF_THRESHOLD_DAYS)
.expect("Should be valid duration");

let rows = sqlx::query!(
r#"SELECT galoy_user_id
FROM email_reminder_projection
WHERE galoy_user_id > $1 AND last_transaction_at IS NOT NULL AND last_transaction_at > $2 AND user_first_seen_at < $3 AND (last_notified_at IS NULL OR last_notified_at < $4)
ORDER BY galoy_user_id
LIMIT $5"#,
search_id.as_ref(),
last_transaction_at_threshold,
user_first_seen_at_threshold,
last_notified_at_threshold,
PAGINATION_BATCH_SIZE + 1,
)
.fetch_all(&self._pool)
.await?;

let more = rows.len() > PAGINATION_BATCH_SIZE as usize;

Ok((
rows.into_iter()
.take(PAGINATION_BATCH_SIZE as usize)
.map(|r| GaloyUserId::from(r.galoy_user_id))
.collect(),
more,
))
}
}
2 changes: 1 addition & 1 deletion core/notifications/src/grpc/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl NotificationsService for Notifications {
.map_err(|e| Status::invalid_argument(e.to_string()))?;
let user_id = GaloyUserId::from(user_id);
self.app
.handle_single_user_event(
.handle_transaction_occurred_event(
user_id,
notification_event::TransactionOccurred {
transaction_type,
Expand Down
18 changes: 10 additions & 8 deletions core/notifications/src/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,27 @@ async fn link_email_reminder(
.execute(|data| async move {
let data: LinkEmailReminderData = data.expect("no LinkEmailReminderData available");
let (ids, more) = email_reminder_projection
.list_ids_after(&data.search_id) this needs to be replaced with new query
.list_ids_to_notify_after(data.search_id.clone())
.await?;
let mut tx = pool.begin().await?;

if more {
let data = LinkEmailReminderData {
search_id: ids.last().expect("there should always be an id").clone(),
payload: data.payload.clone(),
tracing_data: tracing::extract_tracing_data(),
};
spawn_link_email_reminder(&pool, data).await?;
}
for user_id in ids {
let payload = data.payload.clone();
let payload = NotificationEventPayload::from(LinkEmailReminder {});
if payload.should_send_email() {
spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone()))
.await?;
}
spawn_send_push_notification(&mut tx, (user_id, payload)).await?;
spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone())).await?;
email_reminder_projection
.user_notified(&mut tx, user_id.clone())
.await?;
}
Ok::<_, JobError>(JobResult::CompleteWithTx(tx))
})
Expand Down Expand Up @@ -246,6 +249,7 @@ pub async fn spawn_link_email_reminder(
data: impl Into<LinkEmailReminderData>,
) -> Result<(), JobError> {
let data = data.into();
// TODO: I think reusing the id here might cause a bug
match JobBuilder::new_with_id(LINK_EMAIL_REMINDER_ID, "link_email_reminder")
.set_channel_name("link_email_reminder")
.set_json(&data)
Expand Down Expand Up @@ -322,16 +326,14 @@ impl From<NotificationEventPayload> for AllUserEventDispatchData {
#[derive(Debug, Serialize, Deserialize)]
pub(super) struct LinkEmailReminderData {
search_id: GaloyUserId,
payload: NotificationEventPayload,
#[serde(flatten)]
pub(super) tracing_data: HashMap<String, serde_json::Value>,
}

impl From<NotificationEventPayload> for LinkEmailReminderData {
fn from(payload: NotificationEventPayload) -> Self {
impl From<()> for LinkEmailReminderData {
fn from(_: ()) -> Self {
Self {
search_id: GaloyUserId::search_begin(),
payload,
tracing_data: tracing::extract_tracing_data(),
}
}
Expand Down

0 comments on commit 2ae3397

Please sign in to comment.