diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index af94716e9..b07fa36c3 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -88,13 +88,8 @@ where let Some(file) = file else { anyhow::bail!("FileInfoPoller sender was dropped unexpectedly"); }; - tracing::info!("Verifying file: {}", file.file_info); - let ts = file.file_info.timestamp; - let mut transaction = self.pool.begin().await?; - let reports = file.into_stream(&mut transaction).await?; - crate::accumulate::accumulate_sessions(&self.gateway_info_resolver, &self.authorization_verifier, &mut transaction, &self.invalid_data_session_report_sink, ts, reports).await?; - transaction.commit().await?; - self.invalid_data_session_report_sink.commit().await?; + + self.process_file(file).await?; }, _ = sleep_until(burn_time) => { // It's time to burn @@ -112,6 +107,33 @@ where } } } + + async fn process_file( + &self, + file: FileInfoStream, + ) -> Result<()> { + tracing::info!("Verifying file: {}", file.file_info); + + let ts = file.file_info.timestamp; + let mut transaction = self.pool.begin().await?; + let reports = file.into_stream(&mut transaction).await?; + + accumulate_sessions( + &self.gateway_info_resolver, + &self.authorization_verifier, + &mut transaction, + &self.invalid_data_session_report_sink, + ts, + reports, + ) + .await?; + + transaction.commit().await?; + self.invalid_data_session_report_sink.commit().await?; + self.pending_data_session_report_sink.commit().await?; + + Ok(()) + } } #[derive(Debug, clap::Args)]