diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index a307bbfd1..318ce8de3 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -165,7 +165,7 @@ pub const SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str = pub const VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str = "verified_subscriber_verified_mapping_ingest_report"; pub const PROMOTION_REWARD_INGEST_REPORT: &str = "promotion_reward_ingest_report"; -pub const VERIFIED_PROMOTION_REWARD_INGEST_REPORT: &str = "verified_promotion_reward_ingest_report"; +pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -223,7 +223,7 @@ pub enum FileType { SubscriberVerifiedMappingEventIngestReport, VerifiedSubscriberVerifiedMappingEventIngestReport, PromotionRewardIngestReport, - VerifiedPromotionRewardIngestReport, + VerifiedPromotionReward, } impl fmt::Display for FileType { @@ -296,7 +296,7 @@ impl fmt::Display for FileType { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT } Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT, - Self::VerifiedPromotionRewardIngestReport => VERIFIED_PROMOTION_REWARD_INGEST_REPORT, + Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD, }; f.write_str(s) } @@ -372,7 +372,7 @@ impl FileType { VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT } Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT, - Self::VerifiedPromotionRewardIngestReport => VERIFIED_PROMOTION_REWARD_INGEST_REPORT, + Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD, } } } @@ -448,7 +448,7 @@ impl FromStr for FileType { Self::VerifiedSubscriberVerifiedMappingEventIngestReport } PROMOTION_REWARD_INGEST_REPORT => Self::PromotionRewardIngestReport, - VERIFIED_PROMOTION_REWARD_INGEST_REPORT => Self::VerifiedPromotionRewardIngestReport, + VERIFIED_PROMOTION_REWARD => Self::VerifiedPromotionReward, _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index fcd539cc7..b80892a43 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -247,3 +247,13 @@ impl_file_sink!( FileType::RewardManifest.to_str(), "reward_manifest" ); +impl_file_sink!( + poc_mobile::PromotionRewardIngestReportV1, + FileType::PromotionRewardIngestReport.to_str(), + "promotion_reward_ingest_report" +); +impl_file_sink!( + poc_mobile::VerifiedPromotionRewardV1, + FileType::VerifiedPromotionReward.to_str(), + "verified_promotion_reward" +); diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 5e57c0d4d..1bfd2d7e1 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -460,10 +460,7 @@ impl poc_mobile::PocMobile for GrpcServer { report: Some(event), })?; - let _ = self - .promotion_reward_sink - .write(report, []) - .await; + let _ = self.promotion_reward_sink.write(report, []).await; let id = received_timestamp.to_string(); Ok(Response::new(PromotionRewardRespV1 { id })) @@ -568,17 +565,12 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .await?; let (subscriber_referral_eligibility_sink, subscriber_referral_eligibility_server) = - file_sink::FileSinkBuilder::new( - FileType::PromotionRewardIngestReport, + PromotionRewardIngestReportV1::file_sink( store_base_path, file_upload.clone(), - concat!( - env!("CARGO_PKG_NAME"), - "_subscriber_referral_eligibility_ingest_report" - ), + Some(settings.roll_time), + env!("CARGO_PKG_NAME"), ) - .roll_time(settings.roll_time) - .create() .await?; let Some(api_token) = settings diff --git a/mobile_verifier/src/promotion_reward.rs b/mobile_verifier/src/promotion_reward.rs index 0784d88b0..25fafe189 100644 --- a/mobile_verifier/src/promotion_reward.rs +++ b/mobile_verifier/src/promotion_reward.rs @@ -1,11 +1,11 @@ use chrono::{DateTime, Utc}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, file_source, file_upload::FileUpload, promotion_reward::{Entity, PromotionReward}, - traits::TimestampEncode, + traits::{FileSinkWriteExt, TimestampEncode}, FileType, }; use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; @@ -35,7 +35,7 @@ pub struct PromotionRewardDaemon { pool: PgPool, carrier_client: CarrierServiceClient, promotion_rewards: Receiver>, - promotion_rewards_sink: FileSinkClient, + promotion_rewards_sink: FileSinkClient, } impl PromotionRewardDaemon { @@ -47,15 +47,12 @@ impl PromotionRewardDaemon { carrier_client: CarrierServiceClient, ) -> anyhow::Result { let (promotion_rewards_sink, valid_promotion_rewards_server) = - file_sink::FileSinkBuilder::new( - FileType::VerifiedPromotionRewardIngestReport, + VerifiedPromotionRewardV1::file_sink( settings.store_base_path(), file_upload.clone(), - concat!(env!("CARGO_PKG_NAME"), "_promotion_reward"), + Some(Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), ) - .auto_commit(false) - .roll_time(Duration::from_secs(15 * 60)) - .create() .await?; let (promotion_rewards, promotion_rewards_server) = @@ -183,7 +180,10 @@ impl VerifiedPromotionReward { matches!(self.validity, PromotionRewardStatus::Valid) } - async fn write(&self, promotion_rewards: &FileSinkClient) -> anyhow::Result<()> { + async fn write( + &self, + promotion_rewards: &FileSinkClient, + ) -> anyhow::Result<()> { promotion_rewards .write( VerifiedPromotionRewardV1 {