diff --git a/iot_verifier/src/poc.rs b/iot_verifier/src/poc.rs index 95b67f424..2a4392e16 100644 --- a/iot_verifier/src/poc.rs +++ b/iot_verifier/src/poc.rs @@ -57,9 +57,6 @@ lazy_static! { static ref MAX_WITNESS_LAG: Duration = Duration::milliseconds(1500); /// max permitted lag between the beaconer and a witness static ref MAX_BEACON_TO_WITNESS_LAG: Duration = Duration::milliseconds(4000); - /// the duration in which a beaconer or witness must have a valid opposite report from - static ref RECIPROCITY_WINDOW: Duration = Duration::hours(48); - } #[derive(Debug, PartialEq)] pub struct InvalidResponse { @@ -70,8 +67,8 @@ pub struct InvalidResponse { pub struct Poc { pool: PgPool, beacon_interval: Duration, - beacon_report: IotBeaconIngestReport, - witness_reports: Vec, + pub beacon_report: IotBeaconIngestReport, + pub witness_reports: Vec, entropy_start: DateTime, entropy_end: DateTime, entropy_version: i32, @@ -118,7 +115,6 @@ impl Poc { gateway_cache: &GatewayCache, region_cache: &RegionCache, deny_list: &DenyList, - witness_updater: &WitnessUpdater, ) -> anyhow::Result where G: Gateways, @@ -176,21 +172,9 @@ impl Poc { &beaconer_pub_key, self.beacon_report.received_timestamp, ) - .await?; - } - - // post regular validations, check for beacon reciprocity - // if this check fails we will invalidate the beacon - // even tho it has passed all regular validations - if !self.verify_beacon_reciprocity(witness_updater).await? { - Ok(VerifyBeaconResult::invalid( - InvalidReason::GatewayNoValidWitnesses, - None, - beaconer_info, - )) - } else { - Ok(VerifyBeaconResult::valid(beaconer_info, tx_scale)) + .await? } + Ok(VerifyBeaconResult::valid(beaconer_info, tx_scale)) } Err(invalid_response) => Ok(VerifyBeaconResult::invalid( invalid_response.reason, @@ -233,7 +217,7 @@ impl Poc { ) .await { - Ok(mut verified_witness) => { + Ok(verified_witness) => { // track which gateways we have saw a witness report from existing_gateways.push(verified_witness.report.pub_key.clone()); if verified_witness.status == VerificationStatus::Valid { @@ -242,17 +226,6 @@ impl Poc { id: witness_report.report.pub_key.clone(), timestamp: verified_witness.received_timestamp, }); - - // post regular validations, check for witness reciprocity - // if this check fails we will invalidate the witness - // even tho it has passed all regular validations - if !self.verify_witness_reciprocity(&witness_report).await? { - verified_witness.status = VerificationStatus::Invalid; - verified_witness.invalid_reason = - InvalidReason::GatewayNoValidBeacons; - verified_witness.participant_side = - InvalidParticipantSide::Witness; - } }; verified_witnesses.push(verified_witness); } @@ -383,31 +356,6 @@ impl Poc { )), } } - - async fn verify_beacon_reciprocity( - &self, - witness_updater: &WitnessUpdater, - ) -> anyhow::Result { - if !self.witness_reports.is_empty() { - let last_witness = witness_updater - .get_last_witness(&self.beacon_report.report.pub_key) - .await?; - return Ok(last_witness.map_or(false, |lw| { - self.beacon_report.received_timestamp - lw.timestamp < *RECIPROCITY_WINDOW - })); - } - Ok(false) - } - - async fn verify_witness_reciprocity( - &self, - report: &IotWitnessIngestReport, - ) -> anyhow::Result { - let last_beacon = LastBeacon::get(&self.pool, &report.report.pub_key).await?; - Ok(last_beacon.map_or(false, |lw| { - report.received_timestamp - lw.timestamp < *RECIPROCITY_WINDOW - })) - } } #[allow(clippy::too_many_arguments)] diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 4d420ec7c..946695c6c 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -1,6 +1,7 @@ use crate::{ gateway_cache::GatewayCache, hex_density::HexDensityMap, + last_beacon::LastBeacon, poc::{Poc, VerifyBeaconResult}, poc_report::Report, region_cache::RegionCache, @@ -22,10 +23,11 @@ use file_store::{ }; use futures::{future::LocalBoxFuture, stream, StreamExt, TryFutureExt}; use helium_proto::services::poc_lora::{ - InvalidParticipantSide, InvalidReason, LoraInvalidBeaconReportV1, LoraInvalidWitnessReportV1, - LoraPocV1, VerificationStatus, + InvalidDetails, InvalidParticipantSide, InvalidReason, LoraInvalidBeaconReportV1, + LoraInvalidWitnessReportV1, LoraPocV1, VerificationStatus, }; -use iot_config::client::Gateways; +use iot_config::{client::Gateways, gateway_info::GatewayInfo}; +use lazy_static::lazy_static; use rust_decimal::{Decimal, MathematicalOps}; use rust_decimal_macros::dec; use sqlx::PgPool; @@ -41,6 +43,11 @@ const WITNESS_REDUNDANCY: u32 = 4; const POC_REWARD_DECAY_RATE: Decimal = dec!(0.8); const HIP15_TX_REWARD_UNIT_CAP: Decimal = Decimal::TWO; +lazy_static! { +/// the duration in which a beaconer or witness must have a valid opposite report from + static ref RECIPROCITY_WINDOW: ChronoDuration = ChronoDuration::hours(48); +} + pub struct Runner { pub pool: PgPool, pub beacon_interval: ChronoDuration, @@ -226,7 +233,7 @@ where } async fn handle_beacon_report(&self, db_beacon: Report) -> anyhow::Result<()> { - // TODO: look at wrapping all db access from this point onwards in a transaction + // get the beacon report and any associated witnesses and then generate a POC let entropy_start_time = match db_beacon.timestamp { Some(v) => v, None => return Ok(()), @@ -239,8 +246,6 @@ where let beacon_buf: &[u8] = &db_beacon.report_data; let beacon_report = IotBeaconIngestReport::decode(beacon_buf)?; - let beacon = &beacon_report.report; - let beacon_received_ts = beacon_report.received_timestamp; let db_witnesses = Report::get_witnesses_for_beacon(&self.pool, packet_data, self.witness_max_retries) @@ -248,24 +253,27 @@ where let witness_len = db_witnesses.len(); tracing::debug!("found {witness_len} witness for beacon"); - // get the beacon and witness report PBs from the db reports - let mut witnesses: Vec = Vec::new(); - for db_witness in db_witnesses { - let witness_buf: &[u8] = &db_witness.report_data; - witnesses.push(IotWitnessIngestReport::decode(witness_buf)?); - } + // decode the protobuf witness reports + let witnesses = db_witnesses + .into_iter() + .map(|w| IotWitnessIngestReport::decode(w.report_data.as_slice())) + .collect::, _>>()?; // create the struct defining this POC - let mut poc = Poc::new( + let poc = Poc::new( self.pool.clone(), self.beacon_interval, - beacon_report.clone(), - witnesses.clone(), + beacon_report, + witnesses, entropy_start_time, entropy_version, ) .await; + self.verify_poc(poc).await + } + + async fn verify_poc(&self, mut poc: Poc) -> anyhow::Result<()> { // verify POC beacon let beacon_verify_result = poc .verify_beacon( @@ -273,129 +281,187 @@ where &self.gateway_cache, &self.region_cache, &self.deny_list, - &self.witness_updater, ) .await?; - match beacon_verify_result.result { - VerificationStatus::Valid => { + match beacon_verify_result { + VerifyBeaconResult { + result: VerificationStatus::Valid, + gateway_info: Some(beacon_info), + .. + } => { // beacon is valid, verify the POC witnesses - if let Some(beacon_info) = beacon_verify_result.gateway_info { - let verified_witnesses_result = poc - .verify_witnesses( - &beacon_info, - &self.hex_density_map, - &self.gateway_cache, - &self.deny_list, - &self.witness_updater, - ) - .await?; - - // check if there are any failed witnesses - // if so update the DB attempts count - // and halt here, let things be reprocessed next tick - // if a witness continues to fail it will eventually - // be discarded from the list returned for the beacon - // thus one or more failing witnesses will not block the overall POC - if !verified_witnesses_result.failed_witnesses.is_empty() { - tracing::warn!("failed to handle witness"); - for failed_witness_report in verified_witnesses_result.failed_witnesses { - let failed_witness = failed_witness_report.report; - let id = - failed_witness.report_id(failed_witness_report.received_timestamp); - Report::update_attempts(&self.pool, &id, Utc::now()).await?; - } - return Ok(()); - }; - - let max_witnesses_per_poc = self.max_witnesses_per_poc as usize; - - // filter witnesses into selected and unselected lists - // the selected list will contain only valid witnesses - // up to a max count equal to `max_witnesses_per_poc` - // these witnesses will be rewarded - // the unselected list will contain potentially a mix of - // valid and invalid witnesses - // none of which will be rewarded - // we exclude self witnesses from the unselected lists - // these are dropped to the floor, never make it to s3 - let (mut selected_witnesses, invalid_witnesses) = - filter_witnesses(verified_witnesses_result.verified_witnesses); - - // keep a subset of our selected and valid witnesses - let mut unselected_witnesses = - sort_and_split_witnesses(&mut selected_witnesses, max_witnesses_per_poc)?; - - // concat the unselected valid witnesses and the invalid witnesses - // these will then form the unselected list on the poc - unselected_witnesses = - [&unselected_witnesses[..], &invalid_witnesses[..]].concat(); - - // get the number of valid witnesses in our selected list - let num_valid_selected_witnesses = selected_witnesses.len(); - - // get reward units based on the count of valid selected witnesses - let beaconer_reward_units = - poc_beaconer_reward_unit(num_valid_selected_witnesses as u32)?; - let witness_reward_units = - poc_per_witness_reward_unit(num_valid_selected_witnesses as u32)?; - // update the reward units for those valid witnesses within our selected list - selected_witnesses - .iter_mut() - .for_each(|witness| match witness.status { - VerificationStatus::Valid => witness.reward_unit = witness_reward_units, - VerificationStatus::Invalid => witness.reward_unit = Decimal::ZERO, - }); - - // metadata at this point will always be Some... - let (location, gain, elevation) = match beacon_info.metadata { - Some(metadata) => { - (Some(metadata.location), metadata.gain, metadata.elevation) - } - None => (None, 0, 0), - }; - - let valid_beacon_report = IotValidBeaconReport { - received_timestamp: beacon_received_ts, - location, - gain, - elevation, - hex_scale: beacon_verify_result - .hex_scale - .ok_or(RunnerError::NotFound("invalid hex scaling factor"))?, - report: beacon.clone(), - reward_unit: beaconer_reward_units, - }; - self.handle_valid_poc( - valid_beacon_report, - selected_witnesses, - unselected_witnesses, + let verified_witnesses_result = poc + .verify_witnesses( + &beacon_info, + &self.hex_density_map, + &self.gateway_cache, + &self.deny_list, + &self.witness_updater, ) .await?; + + // check if there are any failed witnesses + // if so update the DB attempts count + // and halt here, let things be reprocessed next tick + // if a witness continues to fail it will eventually + // be discarded from the list returned for the beacon + // thus one or more failing witnesses will not block the overall POC + if !verified_witnesses_result.failed_witnesses.is_empty() { + tracing::warn!("failed to handle witness"); + for failed_witness_report in verified_witnesses_result.failed_witnesses { + let failed_witness = failed_witness_report.report; + let id = failed_witness.report_id(failed_witness_report.received_timestamp); + Report::update_attempts(&self.pool, &id, Utc::now()).await?; + } + return Ok(()); + }; + + if !self.verify_beacon_reciprocity(&poc.beacon_report).await? { + return self + .handle_invalid_poc( + poc, + InvalidReason::GatewayNoValidWitnesses, + None, + Some(beacon_info), + ) + .await; } + + let verified_witnesses = self + .verify_witnesses_reciprocity(verified_witnesses_result.verified_witnesses) + .await?; + + self.handle_valid_poc( + poc, + beacon_info, + beacon_verify_result.hex_scale, + verified_witnesses, + ) + .await } - VerificationStatus::Invalid => { + _ => { // the beacon is invalid, which in turn renders all witnesses invalid - self.handle_invalid_poc(beacon_verify_result, &beacon_report, witnesses) - .await?; + self.handle_invalid_poc( + poc, + beacon_verify_result.invalid_reason, + beacon_verify_result.invalid_details, + beacon_verify_result.gateway_info, + ) + .await } } + } + + async fn handle_valid_poc( + &self, + poc: Poc, + beacon_info: GatewayInfo, + beacon_hex_scale: Option, + verified_witnesses: Vec, + ) -> anyhow::Result<()> { + let beacon_received_ts = poc.beacon_report.received_timestamp; + let packet_data = poc.beacon_report.report.data.clone(); + let beacon_report_id = poc.beacon_report.report.report_id(beacon_received_ts); + + let max_witnesses_per_poc = self.max_witnesses_per_poc as usize; + + // filter witnesses into selected and unselected lists + // the selected list will contain only valid witnesses + // up to a max count equal to `max_witnesses_per_poc` + // these witnesses will be rewarded + // the unselected list will contain potentially a mix of + // valid and invalid witnesses + // none of which will be rewarded + // we exclude self witnesses from the unselected lists + // these are dropped to the floor, never make it to s3 + let (mut selected_witnesses, invalid_witnesses) = filter_witnesses(verified_witnesses); + + // keep a subset of our selected and valid witnesses + let mut unselected_witnesses = + sort_and_split_witnesses(&mut selected_witnesses, max_witnesses_per_poc)?; + + // concat the unselected valid witnesses and the invalid witnesses + // these will then form the unselected list on the poc + unselected_witnesses = [&unselected_witnesses[..], &invalid_witnesses[..]].concat(); + + // get the number of valid witnesses in our selected list + let num_valid_selected_witnesses = selected_witnesses.len(); + + // get reward units based on the count of valid selected witnesses + let beaconer_reward_units = poc_beaconer_reward_unit(num_valid_selected_witnesses as u32)?; + let witness_reward_units = + poc_per_witness_reward_unit(num_valid_selected_witnesses as u32)?; + // update the reward units for those valid witnesses within our selected list + selected_witnesses + .iter_mut() + .for_each(|witness| match witness.status { + VerificationStatus::Valid => witness.reward_unit = witness_reward_units, + VerificationStatus::Invalid => witness.reward_unit = Decimal::ZERO, + }); + + // metadata at this point will always be Some... + let (location, gain, elevation) = match beacon_info.metadata { + Some(metadata) => (Some(metadata.location), metadata.gain, metadata.elevation), + None => (None, 0, 0), + }; + + let valid_beacon_report = IotValidBeaconReport { + received_timestamp: beacon_received_ts, + location, + gain, + elevation, + hex_scale: beacon_hex_scale + .ok_or(RunnerError::NotFound("invalid hex scaling factor"))?, + report: poc.beacon_report.report.clone(), + reward_unit: beaconer_reward_units, + }; + + let iot_poc: IotPoc = IotPoc { + poc_id: beacon_report_id.clone(), + beacon_report: valid_beacon_report, + selected_witnesses: selected_witnesses.clone(), + unselected_witnesses: unselected_witnesses.clone(), + }; + + let mut transaction = self.pool.begin().await?; + for reward_share in GatewayPocShare::shares_from_poc(&iot_poc) { + reward_share.save(&mut transaction).await?; + } + transaction.commit().await?; + + let poc_proto: LoraPocV1 = iot_poc.into(); + // save the poc to s3, if write fails update attempts and go no further + // allow the poc to be reprocessed next tick + match self.poc_sink.write(poc_proto, []).await { + Ok(_) => (), + Err(err) => { + tracing::error!("failed to save invalid_witness_report to s3, {err}"); + Report::update_attempts(&self.pool, &beacon_report_id, Utc::now()).await?; + return Ok(()); + } + } + // write out metrics for any witness which failed verification + fire_invalid_witness_metric(&selected_witnesses); + fire_invalid_witness_metric(&unselected_witnesses); + + Report::delete_poc(&self.pool, &packet_data).await?; + telemetry::decrement_num_beacons(); Ok(()) } async fn handle_invalid_poc( &self, - beacon_verify_result: VerifyBeaconResult, - beacon_report: &IotBeaconIngestReport, - witness_reports: Vec, + poc: Poc, + beacon_invalid_reason: InvalidReason, + beacon_invalid_details: Option, + beacon_info: Option, ) -> anyhow::Result<()> { // the beacon is invalid, which in turn renders all witnesses invalid - let beacon = &beacon_report.report; + let beacon = &poc.beacon_report.report; let beacon_id = beacon.data.clone(); - let beacon_report_id = beacon_report.ingest_id(); - let beacon_invalid_reason = beacon_verify_result.invalid_reason; - let beacon_invalid_details = beacon_verify_result.invalid_details; + let beacon_report_id = poc.beacon_report.ingest_id(); - let (location, elevation, gain) = match beacon_verify_result.gateway_info { + let (location, elevation, gain) = match beacon_info { Some(gateway_info) => match gateway_info.metadata { Some(metadata) => (Some(metadata.location), metadata.elevation, metadata.gain), None => (None, 0, 0), @@ -404,7 +470,7 @@ where }; let invalid_poc: IotInvalidBeaconReport = IotInvalidBeaconReport { - received_timestamp: beacon_report.received_timestamp, + received_timestamp: poc.beacon_report.received_timestamp, reason: beacon_invalid_reason, invalid_details: beacon_invalid_details.clone(), report: beacon.clone(), @@ -419,7 +485,7 @@ where .invalid_beacon_sink .write( invalid_poc_proto, - &[("reason", beacon_verify_result.invalid_reason.as_str_name())], + &[("reason", beacon_invalid_reason.as_str_name())], ) .await { @@ -435,7 +501,7 @@ where // we will have to clean out any successful writes of other witnesses // and also the invalid poc // so if a report fails from this point on, it shall be lost for ever more - for witness_report in witness_reports { + for witness_report in poc.witness_reports { let invalid_witness_report: IotInvalidWitnessReport = IotInvalidWitnessReport { received_timestamp: witness_report.received_timestamp, report: witness_report.report, @@ -449,7 +515,7 @@ where .invalid_witness_sink .write( invalid_witness_report_proto, - &[("reason", beacon_verify_result.invalid_reason.as_str_name())], + &[("reason", beacon_invalid_reason.as_str_name())], ) .await { @@ -465,52 +531,46 @@ where Ok(()) } - async fn handle_valid_poc( + async fn verify_beacon_reciprocity( &self, - valid_beacon_report: IotValidBeaconReport, - selected_witnesses: Vec, - unselected_witnesses: Vec, - ) -> anyhow::Result<()> { - let received_timestamp = valid_beacon_report.received_timestamp; - let beacon_id = valid_beacon_report.report.report_id(received_timestamp); - let packet_data = valid_beacon_report.report.data.clone(); - let beacon_report_id = valid_beacon_report.report.report_id(received_timestamp); - let iot_poc: IotPoc = IotPoc { - poc_id: beacon_id, - beacon_report: valid_beacon_report, - selected_witnesses: selected_witnesses.clone(), - unselected_witnesses: unselected_witnesses.clone(), - }; - - let mut transaction = self.pool.begin().await?; - for reward_share in GatewayPocShare::shares_from_poc(&iot_poc) { - reward_share.save(&mut transaction).await?; - } - // TODO: expand this transaction to cover all of the database access below? - transaction.commit().await?; + beacon_report: &IotBeaconIngestReport, + ) -> anyhow::Result { + let last_witness = self + .witness_updater + .get_last_witness(&beacon_report.report.pub_key) + .await?; + Ok(last_witness.map_or(false, |lw| { + beacon_report.received_timestamp - lw.timestamp < *RECIPROCITY_WINDOW + })) + } - let poc_proto: LoraPocV1 = iot_poc.into(); - // save the poc to s3, if write fails update attempts and go no further - // allow the poc to be reprocessed next tick - match self.poc_sink.write(poc_proto, []).await { - Ok(_) => (), - Err(err) => { - tracing::error!("failed to save invalid_witness_report to s3, {err}"); - Report::update_attempts(&self.pool, &beacon_report_id, Utc::now()).await?; - return Ok(()); + async fn verify_witnesses_reciprocity( + &self, + witnesses: Vec, + ) -> anyhow::Result> { + let mut verified_witnesses = Vec::new(); + for mut witness in witnesses { + if witness.status == VerificationStatus::Valid + && !self.verify_witness_reciprocity(&witness).await? + { + witness.invalid_reason = InvalidReason::GatewayNoValidBeacons; + witness.status = VerificationStatus::Invalid; + witness.invalid_details = None; + witness.participant_side = InvalidParticipantSide::Witness } + verified_witnesses.push(witness) } - // write out metrics for any witness which failed verification - // TODO: work our approach that doesn't require the prior cloning of - // the selected and unselected witnesses vecs - // tried to do this directly from the now discarded poc_proto - // but could nae get it to get a way past the lack of COPY - fire_invalid_witness_metric(&selected_witnesses); - fire_invalid_witness_metric(&unselected_witnesses); + Ok(verified_witnesses) + } - Report::delete_poc(&self.pool, &packet_data).await?; - telemetry::decrement_num_beacons(); - Ok(()) + async fn verify_witness_reciprocity( + &self, + report: &IotVerifiedWitnessReport, + ) -> anyhow::Result { + let last_beacon = LastBeacon::get(&self.pool, &report.report.pub_key).await?; + Ok(last_beacon.map_or(false, |lw| { + report.received_timestamp - lw.timestamp < *RECIPROCITY_WINDOW + })) } } diff --git a/iot_verifier/tests/runner_tests.rs b/iot_verifier/tests/runner_tests.rs index 3bdb0f8d2..6ebbaef48 100644 --- a/iot_verifier/tests/runner_tests.rs +++ b/iot_verifier/tests/runner_tests.rs @@ -981,24 +981,18 @@ async fn valid_new_gateway_beacon_first_reciprocity(pool: PgPool) -> anyhow::Res } #[sqlx::test] -async fn valid_beacon_and_no_witnesses(pool: PgPool) -> anyhow::Result<()> { +async fn valid_lone_wolf_beacon(pool: PgPool) -> anyhow::Result<()> { let test_beacon_interval = ChronoDuration::seconds(5); let mut ctx = TestContext::setup(pool.clone(), test_beacon_interval).await?; let now = ctx.entropy_ts; - // simulate a gateway submitting a beacon which is not witnessed by other gateways - // this replicates a scenario whereby a gateway cannot broadcast due to a hardware failure - // but yet continues to submit beacon reports to the oracle - // also simulates a lone wolf gateway, broadcasting and no one around to hear it + // simulate a lone wolf gateway, broadcasting and no one around to hear it // the gateway uses beaconer1 pubkey - // the gateway will not be able to successfully witness other gateways due to reciprocity - // until it has successfully had a beacon witnessed by another gateway // // step 1 - generate a beacon from beaconer1, - // this beacon will be valid but will fail reciprocity check as there are no witnesses - // from other gateways for the beacon - // last beacon timestamp will NOT as there are no witnesses + // this beacon will be valid but will fail reciprocity check as the gateway has not previously witnessed + // last beacon timestamp will NOT be updated as there are no witnesses to the beacon // let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER1, ctx.entropy_ts); common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; @@ -1022,9 +1016,13 @@ async fn valid_beacon_and_no_witnesses(pool: PgPool) -> anyhow::Result<()> { // // step 2 + // confirm beaconer1's last beacon timestamp was not updated + // the gateway should not be able to witness a beacon and will fail reciprocity check + // generate a beacon from beaconer5 and have it witnessed by beaconer1 - // the witness will fail reciprocity check as beaconer1 does not have - // a current last beacon timestamp + // the beacon will be successful but the witness will fail reciprocity check + // as beaconer1 does not have a current last beacon timestamp + // beaconer1's last witness timestamp will be updated ( as its witness was structurally valid ) // let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER5, ctx.entropy_ts); @@ -1032,7 +1030,8 @@ async fn valid_beacon_and_no_witnesses(pool: PgPool) -> anyhow::Result<()> { common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; - // inject last beacon & witness timestamps into the DB for beaconer 5 - allow it to pass reciprocity checks + // inject last beacon & witness timestamps into the DB for beaconer 5 + // allow it to pass reciprocity checks let mut txn = pool.begin().await?; common::inject_last_beacon( &mut txn, @@ -1080,45 +1079,102 @@ async fn valid_beacon_and_no_witnesses(pool: PgPool) -> anyhow::Result<()> { InvalidParticipantSide::Witness as i32, invalid_witness_report.participant_side ); + Ok(()) +} + +#[sqlx::test] +async fn valid_two_isolated_gateways_beaconing_and_witnessing(pool: PgPool) -> anyhow::Result<()> { + let test_beacon_interval = ChronoDuration::seconds(5); + let mut ctx = TestContext::setup(pool.clone(), test_beacon_interval).await?; + + // simulate two gateways with no recent activity coming online and + // witnessing each others beacons // - // step 3 - // generate a second beacon attempt from beaconer1 - // and witness the beacon from another gateway - // this beacon will pass the reciprocity check now that it has an associated witness - // the witness itself will also pass the reciprocity check as we pre seed necessary timestamps + // step 1 + // generate a beacon from gateway1 with gateway 2 as witness + // this beacon will be valid but will fail reciprocity check as no previous witness + // gateway 1's last beacon timestamp will be updated + // gateway 2's last witness timestamp will be updated + // + let beacon_to_inject = common::create_valid_beacon_report(common::BEACONER1, ctx.entropy_ts); + let witness_to_inject = common::create_valid_witness_report(common::WITNESS1, ctx.entropy_ts); + common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + ctx.runner.handle_db_tick().await?; + + let invalid_beacon = ctx.invalid_beacons.receive_invalid_beacon().await; + let invalid_beacon_report = invalid_beacon.report.clone().unwrap(); + println!("{:?}", invalid_beacon); + // assert the pubkeys in the outputted reports + // match those which we injected + assert_eq!( + PublicKeyBinary::from(invalid_beacon_report.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER1).unwrap() + ); + // assert the invalid details + assert_eq!( + InvalidReason::GatewayNoValidWitnesses as i32, + invalid_beacon.reason + ); + // + // step 2 + // generate a beacon from gateway2 with gateway 1 as witness, + // this beacon will be valid and will pass reciprocity check as the gateway 2 witnessed previously + // gateway 2's last beacon timestamp will be updated + // gateway 1's last witness timestamp will be updated + // + + let beacon_to_inject = common::create_valid_beacon_report(common::WITNESS1, ctx.entropy_ts); + let witness_to_inject = common::create_valid_witness_report(common::BEACONER1, ctx.entropy_ts); + common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; + common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; + + ctx.runner.handle_db_tick().await?; + + let valid_poc = ctx.valid_pocs.receive_valid_poc().await; + println!("{:?}", valid_poc); + assert_eq!(1, valid_poc.selected_witnesses.len()); + assert_eq!(0, valid_poc.unselected_witnesses.len()); + let valid_beacon = valid_poc.beacon_report.unwrap().report.clone().unwrap(); + let valid_witness_report = valid_poc.selected_witnesses[0].clone(); + let valid_witness = valid_witness_report.report.unwrap(); + // assert the pubkeys in the outputted reports + // match those which we injected + assert_eq!( + PublicKeyBinary::from(valid_beacon.pub_key.clone()), + PublicKeyBinary::from_str(common::WITNESS1).unwrap() + ); + assert_eq!( + PublicKeyBinary::from(valid_witness.pub_key.clone()), + PublicKeyBinary::from_str(common::BEACONER1).unwrap() + ); + // assert the witness reports status + assert_eq!( + VerificationStatus::Valid as i32, + valid_witness_report.status + ); - // sleep to ensure the second beacon fits with the beaconing interval - tokio::time::sleep(Duration::from_secs(5)).await; + // + // step 3 + // generate a beacon from gateway1 with gateway 2 as witness, + // this beacon will be valid and pass reciprocity check as gateway 1 previously witnessed + // gateway 2 has previously beaconed and so its witness will also be valid + // let beacon_to_inject = common::create_valid_beacon_report( common::BEACONER1, - ctx.entropy_ts + test_beacon_interval, + ctx.entropy_ts + ChronoDuration::seconds(5), ); let witness_to_inject = common::create_valid_witness_report( common::WITNESS1, - ctx.entropy_ts + test_beacon_interval, + ctx.entropy_ts + ChronoDuration::seconds(5), ); common::inject_beacon_report(pool.clone(), beacon_to_inject.clone()).await?; common::inject_witness_report(pool.clone(), witness_to_inject.clone()).await?; - // seed last beacons and witness reports into the DB for witnesser - let mut txn = pool.begin().await?; - common::inject_last_beacon( - &mut txn, - witness_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), - ) - .await?; - common::inject_last_witness( - &mut txn, - witness_to_inject.report.pub_key.clone(), - now - (test_beacon_interval + ChronoDuration::seconds(10)), - ) - .await?; - txn.commit().await?; - ctx.runner.handle_db_tick().await?; let valid_poc = ctx.valid_pocs.receive_valid_poc().await; @@ -1143,5 +1199,6 @@ async fn valid_beacon_and_no_witnesses(pool: PgPool) -> anyhow::Result<()> { VerificationStatus::Valid as i32, valid_witness_report.status ); + Ok(()) }