Skip to content

Commit

Permalink
Fixes for moogey's thing
Browse files Browse the repository at this point in the history
  • Loading branch information
maplant committed Aug 26, 2024
1 parent 8cf8731 commit 9fd5420
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 27 deletions.
10 changes: 5 additions & 5 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub const SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str =
pub const VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT: &str =
"verified_subscriber_verified_mapping_ingest_report";
pub const PROMOTION_REWARD_INGEST_REPORT: &str = "promotion_reward_ingest_report";
pub const VERIFIED_PROMOTION_REWARD_INGEST_REPORT: &str = "verified_promotion_reward_ingest_report";
pub const VERIFIED_PROMOTION_REWARD: &str = "verified_promotion_reward";

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -223,7 +223,7 @@ pub enum FileType {
SubscriberVerifiedMappingEventIngestReport,
VerifiedSubscriberVerifiedMappingEventIngestReport,
PromotionRewardIngestReport,
VerifiedPromotionRewardIngestReport,
VerifiedPromotionReward,
}

impl fmt::Display for FileType {
Expand Down Expand Up @@ -296,7 +296,7 @@ impl fmt::Display for FileType {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT
}
Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionRewardIngestReport => VERIFIED_PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD,
};
f.write_str(s)
}
Expand Down Expand Up @@ -372,7 +372,7 @@ impl FileType {
VERIFIED_SUBSCRIBER_VERIFIED_MAPPING_INGEST_REPORT
}
Self::PromotionRewardIngestReport => PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionRewardIngestReport => VERIFIED_PROMOTION_REWARD_INGEST_REPORT,
Self::VerifiedPromotionReward => VERIFIED_PROMOTION_REWARD,
}
}
}
Expand Down Expand Up @@ -448,7 +448,7 @@ impl FromStr for FileType {
Self::VerifiedSubscriberVerifiedMappingEventIngestReport
}
PROMOTION_REWARD_INGEST_REPORT => Self::PromotionRewardIngestReport,
VERIFIED_PROMOTION_REWARD_INGEST_REPORT => Self::VerifiedPromotionRewardIngestReport,
VERIFIED_PROMOTION_REWARD => Self::VerifiedPromotionReward,
_ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))),
};
Ok(result)
Expand Down
10 changes: 10 additions & 0 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,13 @@ impl_file_sink!(
FileType::RewardManifest.to_str(),
"reward_manifest"
);
impl_file_sink!(
poc_mobile::PromotionRewardIngestReportV1,
FileType::PromotionRewardIngestReport.to_str(),
"promotion_reward_ingest_report"
);
impl_file_sink!(
poc_mobile::VerifiedPromotionRewardV1,
FileType::VerifiedPromotionReward.to_str(),
"verified_promotion_reward"
);
16 changes: 4 additions & 12 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,7 @@ impl poc_mobile::PocMobile for GrpcServer {
report: Some(event),
})?;

let _ = self
.promotion_reward_sink
.write(report, [])
.await;
let _ = self.promotion_reward_sink.write(report, []).await;

let id = received_timestamp.to_string();
Ok(Response::new(PromotionRewardRespV1 { id }))
Expand Down Expand Up @@ -568,17 +565,12 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.await?;

let (subscriber_referral_eligibility_sink, subscriber_referral_eligibility_server) =
file_sink::FileSinkBuilder::new(
FileType::PromotionRewardIngestReport,
PromotionRewardIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
concat!(
env!("CARGO_PKG_NAME"),
"_subscriber_referral_eligibility_ingest_report"
),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.roll_time(settings.roll_time)
.create()
.await?;

let Some(api_token) = settings
Expand Down
20 changes: 10 additions & 10 deletions mobile_verifier/src/promotion_reward.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use chrono::{DateTime, Utc};
use file_store::{
file_info_poller::{FileInfoStream, LookbackBehavior},
file_sink::{self, FileSinkClient},
file_sink::FileSinkClient,
file_source,
file_upload::FileUpload,
promotion_reward::{Entity, PromotionReward},
traits::TimestampEncode,
traits::{FileSinkWriteExt, TimestampEncode},
FileType,
};
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
Expand Down Expand Up @@ -35,7 +35,7 @@ pub struct PromotionRewardDaemon {
pool: PgPool,
carrier_client: CarrierServiceClient,
promotion_rewards: Receiver<FileInfoStream<PromotionReward>>,
promotion_rewards_sink: FileSinkClient,
promotion_rewards_sink: FileSinkClient<VerifiedPromotionRewardV1>,
}

impl PromotionRewardDaemon {
Expand All @@ -47,15 +47,12 @@ impl PromotionRewardDaemon {
carrier_client: CarrierServiceClient,
) -> anyhow::Result<impl ManagedTask> {
let (promotion_rewards_sink, valid_promotion_rewards_server) =
file_sink::FileSinkBuilder::new(
FileType::VerifiedPromotionRewardIngestReport,
VerifiedPromotionRewardV1::file_sink(
settings.store_base_path(),
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_promotion_reward"),
Some(Duration::from_secs(15 * 60)),
env!("CARGO_PKG_NAME"),
)
.auto_commit(false)
.roll_time(Duration::from_secs(15 * 60))
.create()
.await?;

let (promotion_rewards, promotion_rewards_server) =
Expand Down Expand Up @@ -183,7 +180,10 @@ impl VerifiedPromotionReward {
matches!(self.validity, PromotionRewardStatus::Valid)
}

async fn write(&self, promotion_rewards: &FileSinkClient) -> anyhow::Result<()> {
async fn write(
&self,
promotion_rewards: &FileSinkClient<VerifiedPromotionRewardV1>,
) -> anyhow::Result<()> {
promotion_rewards
.write(
VerifiedPromotionRewardV1 {
Expand Down

0 comments on commit 9fd5420

Please sign in to comment.