diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 3db75735f..21500e3a7 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -92,7 +92,7 @@ jobs: strategy: fail-fast: false matrix: - package: [boost-manager,iot-config,iot-packet-verifier,iot-verifier,mobile-config,mobile-verifier] + package: [boost-manager,file-store,iot-config,iot-packet-verifier,iot-verifier,mobile-config,mobile-verifier] concurrency: group: ${{ github.workflow }}-${{ github.ref }}-tests-postgres-${{ matrix.package }} cancel-in-progress: true @@ -142,7 +142,7 @@ jobs: strategy: fail-fast: false matrix: - package: [coverage-map,coverage-point-calculator,file-store,ingest,mobile-packet-verifier,reward-scheduler,task-manager] + package: [coverage-map,coverage-point-calculator,ingest,mobile-packet-verifier,reward-scheduler,task-manager] concurrency: group: ${{ github.workflow }}-${{ github.ref }}-tests-${{ matrix.package }} cancel-in-progress: true diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index 7d80183bf..ecaa9bfdb 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -5,6 +5,7 @@ use derive_builder::Builder; use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt}; use futures_util::TryFutureExt; use retainer::Cache; +use sqlx::postgres::PgQueryResult; use std::{collections::VecDeque, marker::PhantomData, sync::Arc, time::Duration}; use task_manager::ManagedTask; use tokio::sync::mpsc::{Receiver, Sender}; @@ -27,7 +28,13 @@ pub trait FileInfoPollerState: Send + Sync + 'static { async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result; - async fn clean(&self, process_name: &str, file_type: &str) -> Result; + // Returns number of items cleaned + async fn clean( + &self, + process_name: &str, + file_type: &str, + offset: DateTime, + ) -> Result; } #[async_trait::async_trait] @@ -40,6 +47,19 @@ pub trait FileInfoPollerStateRecorder { async fn record(self, process_name: &str, file_info: &FileInfo) -> Result; } +#[async_trait::async_trait] +pub trait FileInfoPollerStore: Send + Sync + 'static { + async fn list_all(&self, file_type: &str, after: A, before: B) -> Result> + where + A: Into>> + Send + Sync + Copy, + B: Into>> + Send + Sync + Copy; + + async fn get_raw(&self, key: K) -> Result + where + K: Into + Send + Sync; +} + +#[derive(Debug)] pub struct FileInfoStream { pub file_info: FileInfo, process_name: String, @@ -84,13 +104,13 @@ pub enum LookbackBehavior { #[derive(Debug, Clone, Builder)] #[builder(pattern = "owned")] -pub struct FileInfoPollerConfig { +pub struct FileInfoPollerConfig { #[builder(default = "DEFAULT_POLL_DURATION")] poll_duration: Duration, - state: S, - store: FileStore, + state: State, + store: Store, prefix: String, - parser: P, + parser: Parser, lookback: LookbackBehavior, #[builder(default = "Duration::from_secs(10 * 60)")] offset: Duration, @@ -99,28 +119,33 @@ pub struct FileInfoPollerConfig { #[builder(default = r#""default".to_string()"#)] process_name: String, #[builder(setter(skip))] - p: PhantomData, + p: PhantomData, } #[derive(Clone)] -pub struct FileInfoPollerServer { - config: FileInfoPollerConfig, - sender: Sender>, +pub struct FileInfoPollerServer { + config: FileInfoPollerConfig, + sender: Sender>, file_queue: VecDeque, latest_file_timestamp: Option>, cache: MemoryFileCache, } type FileInfoStreamReceiver = Receiver>; -impl FileInfoPollerConfigBuilder + +impl FileInfoPollerConfigBuilder where - T: Clone, - S: FileInfoPollerState, - P: FileInfoPollerParser, + Message: Clone, + State: FileInfoPollerState, + Parser: FileInfoPollerParser, + Store: FileInfoPollerStore, { pub async fn create( self, - ) -> Result<(FileInfoStreamReceiver, FileInfoPollerServer)> { + ) -> Result<( + FileInfoStreamReceiver, + FileInfoPollerServer, + )> { let config = self.build()?; let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_size); let latest_file_timestamp = config @@ -141,11 +166,11 @@ where } } -impl ManagedTask for FileInfoPollerServer +impl ManagedTask for FileInfoPollerServer where - T: Send + Sync + 'static, - S: FileInfoPollerState, - P: FileInfoPollerParser, + Message: Send + Sync + 'static, + State: FileInfoPollerState, + Parser: FileInfoPollerParser, { fn start_task( self: Box, @@ -161,11 +186,12 @@ where } } -impl FileInfoPollerServer +impl FileInfoPollerServer where - T: Send + Sync + 'static, - S: FileInfoPollerState, - P: FileInfoPollerParser, + Message: Send + Sync + 'static, + State: FileInfoPollerState, + Parser: FileInfoPollerParser, + Store: FileInfoPollerStore + Send + Sync + 'static, { pub async fn start( self, @@ -253,11 +279,26 @@ where } async fn clean(&self, cache: &MemoryFileCache) -> Result { + let cache_before = cache.len().await; cache.purge(4, 0.25).await; - self.config + let cache_after = cache.len().await; + + let db_removed = self + .config .state - .clean(&self.config.process_name, &self.config.prefix) + .clean( + &self.config.process_name, + &self.config.prefix, + self.after(self.latest_file_timestamp), + ) .await?; + + tracing::info!( + cache_removed = cache_before - cache_after, + db_removed, + "cache clean" + ); + Ok(()) } @@ -355,6 +396,24 @@ async fn cache_file(cache: &MemoryFileCache, file_info: &FileInfo) { cache.insert(file_info.key.clone(), true, CACHE_TTL).await; } +#[async_trait::async_trait] +impl FileInfoPollerStore for FileStore { + async fn list_all(&self, file_type: &str, after: A, before: B) -> Result> + where + A: Into>> + Send + Sync + Copy, + B: Into>> + Send + Sync + Copy, + { + self.list_all(file_type, after, before).await + } + + async fn get_raw(&self, key: K) -> Result + where + K: Into + Send + Sync, + { + self.get_raw(key).await + } +} + #[cfg(feature = "sqlx-postgres")] #[async_trait::async_trait] impl FileInfoPollerStateRecorder for &mut sqlx::Transaction<'_, sqlx::Postgres> { @@ -408,23 +467,208 @@ impl FileInfoPollerState for sqlx::Pool { .map_err(Error::from) } - async fn clean(&self, process_name: &str, file_type: &str) -> Result { - sqlx::query( + async fn clean( + &self, + process_name: &str, + file_type: &str, + offset: DateTime, + ) -> Result { + let t100_timestamp: Option> = sqlx::query_scalar( + r#" + SELECT file_timestamp + FROM files_processed + WHERE process_name = $1 + AND file_type = $2 + ORDER BY file_timestamp DESC + LIMIT 1 OFFSET 100; + "#, + ) + .bind(process_name) + .bind(file_type) + .fetch_optional(self) + .await?; + + let Some(t100) = t100_timestamp else { + // The cleaning limit has not been reached, remove nothing. + return Ok(0); + }; + + // To keep from reprocessing files, we need to make sure rows that exist + // within the offset window are not removed. + let older_than_limit = t100.min(offset); + + let query_result: PgQueryResult = sqlx::query( r#" - DELETE FROM files_processed where file_name in ( - SELECT file_name - FROM files_processed - WHERE process_name = $1 and file_type = $2 - ORDER BY file_timestamp DESC - OFFSET 100 - ) + DELETE FROM files_processed + WHERE process_name = $1 + AND file_type = $2 + AND file_timestamp < $3 "#, ) .bind(process_name) .bind(file_type) + .bind(older_than_limit) .execute(self) .await - .map(|_| ()) - .map_err(Error::from) + .map_err(Error::from)?; + + Ok(query_result.rows_affected()) + } +} + +#[cfg(test)] +mod tests { + + use sqlx::{Executor, PgPool}; + use std::time::Duration; + use tokio::time::timeout; + + use super::*; + + struct TestParser; + struct TestStore(Vec); + + #[async_trait::async_trait] + impl FileInfoPollerParser for TestParser { + async fn parse(&self, _byte_stream: ByteStream) -> Result> { + Ok(vec![]) + } + } + + #[async_trait::async_trait] + impl FileInfoPollerStore for TestStore { + async fn list_all( + &self, + _file_type: &str, + after: A, + before: B, + ) -> Result> + where + A: Into>> + Send + Sync + Copy, + B: Into>> + Send + Sync + Copy, + { + let after = after.into(); + let before = before.into(); + + Ok(self + .0 + .clone() + .into_iter() + .filter(|file_info| after.map_or(true, |v| file_info.timestamp > v)) + .filter(|file_info| before.map_or(true, |v| file_info.timestamp <= v)) + .collect()) + } + + async fn get_raw(&self, _key: K) -> Result + where + K: Into + Send + Sync, + { + Ok(ByteStream::default()) + } + } + + #[sqlx::test] + async fn do_not_reprocess_files_when_offset_exceeds_earliest_file( + pool: PgPool, + ) -> anyhow::Result<()> { + // Cleaning the files_processed table should not cause files within the + // `FileInfoPoller.config.offset` window to be reprocessed. + + // There is no auto-migration for tests in this lib workspace. + pool.execute( + r#" + CREATE TABLE files_processed ( + process_name TEXT NOT NULL DEFAULT 'default', + file_name VARCHAR PRIMARY KEY, + file_type VARCHAR NOT NULL, + file_timestamp TIMESTAMPTZ NOT NULL, + processed_at TIMESTAMPTZ NOT NULL + ); + "#, + ) + .await?; + + // The important aspect of this test is that all the files to be + // processed happen _within_ the lookback offset. + const EXPECTED_FILE_COUNT: i64 = 150; + let mut infos = vec![]; + for seconds in 0..EXPECTED_FILE_COUNT { + let file_info = FileInfo { + key: format!("key-{seconds}"), + prefix: "file_type".to_string(), + timestamp: Utc::now() - chrono::Duration::seconds(seconds), + size: 42, + }; + infos.push(file_info); + } + + // To simulate a restart, we're going to make a new FileInfoPoller. + // This closure is to ensure they have the same settings. + let file_info_builder = || { + let six_hours = chrono::Duration::hours(6).to_std().unwrap(); + FileInfoPollerConfigBuilder::::default() + .parser(TestParser) + .state(pool.clone()) + .store(TestStore(infos.clone())) + .lookback(LookbackBehavior::Max(six_hours)) + .prefix("file_type".to_string()) + .offset(six_hours) + .create() + }; + + // The first startup of the file info poller, there is nothing to clean. + // And all file_infos will be returned to be processed. + let (mut receiver, ingest_server) = file_info_builder().await?; + let (trigger, shutdown) = triggered::trigger(); + tokio::spawn(async move { + if let Err(status) = ingest_server.run(shutdown).await { + println!("ingest server went down unexpectedly: {status:?}"); + } + }); + + // "process" all the files. They are not recorded into the database + // until the file is consumed as a stream. + let mut processed = 0; + while processed < EXPECTED_FILE_COUNT { + match timeout(Duration::from_secs(1), receiver.recv()).await? { + Some(msg) => { + processed += 1; + let mut txn = pool.begin().await?; + let _x = msg.into_stream(&mut txn).await?; + txn.commit().await?; + } + err => panic!("something went wrong: {err:?}"), + }; + } + + // Shutdown the ingest server, we're going to create a new one and start it. + trigger.trigger(); + + // The second startup of the file info poller, there are 100+ files that + // have been processed. The initial clean should not remove processed + // files in a way that causes us to re-receive any files within our + // offset for processing. + let (mut receiver, ingest_server) = file_info_builder().await?; + let (trigger, shutdown) = triggered::trigger(); + let _handle = tokio::spawn(async move { + if let Err(status) = ingest_server.run(shutdown).await { + println!("ingest server went down unexpectedly: {status:?}"); + } + }); + + // Attempting to recieve files for processing. The timeout should fire, + // because all the files we have setup exist within the offset, and + // should still be in the database. + match timeout(Duration::from_secs(1), receiver.recv()).await { + Err(_err) => (), + Ok(msg) => { + panic!("we got something when we expected nothing.: {msg:?}"); + } + } + + // Shut down for great good + trigger.trigger(); + + Ok(()) } } diff --git a/file_store/src/file_source.rs b/file_store/src/file_source.rs index ff77f1162..a028d2d8b 100644 --- a/file_store/src/file_source.rs +++ b/file_store/src/file_source.rs @@ -1,6 +1,6 @@ use crate::{ file_info_poller::{FileInfoPollerConfigBuilder, MsgDecodeFileInfoPollerParser}, - file_sink, BytesMutStream, Error, + file_sink, BytesMutStream, Error, FileStore, }; use async_compression::tokio::bufread::GzipDecoder; use futures::{ @@ -11,11 +11,12 @@ use std::path::{Path, PathBuf}; use tokio::{fs::File, io::BufReader}; use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedRead}; -pub fn continuous_source() -> FileInfoPollerConfigBuilder +pub fn continuous_source( +) -> FileInfoPollerConfigBuilder where T: Clone, { - FileInfoPollerConfigBuilder::::default() + FileInfoPollerConfigBuilder::::default() .parser(MsgDecodeFileInfoPollerParser) } diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index 697b12960..e3ea08cb9 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -166,6 +166,7 @@ where ServiceProviderBoostedRewardsBannedRadioIngestReportV1, _, _, + _, >::default() .parser(ProstFileInfoPollerParser) .state(pool.clone())