diff --git a/file_store/src/iot_packet.rs b/file_store/src/iot_packet.rs index 477bc79af..e337c4a19 100644 --- a/file_store/src/iot_packet.rs +++ b/file_store/src/iot_packet.rs @@ -18,6 +18,7 @@ pub struct PacketRouterPacketReport { pub oui: u64, pub net_id: u32, pub rssi: i32, + pub free: bool, /// Frequency in Hz pub frequency: u32, pub snr: f32, @@ -90,6 +91,7 @@ impl TryFrom for PacketRouterPacketReport { oui: v.oui, net_id: v.net_id, rssi: v.rssi, + free: v.free, frequency: v.frequency, snr: v.snr, data_rate, diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 894e4b3e3..4f632d23a 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -71,7 +71,11 @@ where continue; } - let debit_amount = payload_size_to_dc(report.payload_size as u64); + let debit_amount = if report.free { + 0 + } else { + payload_size_to_dc(report.payload_size as u64) + }; let payer = self .config_server diff --git a/iot_packet_verifier/tests/integration_tests.rs b/iot_packet_verifier/tests/integration_tests.rs index 461c528e1..b20587b36 100644 --- a/iot_packet_verifier/tests/integration_tests.rs +++ b/iot_packet_verifier/tests/integration_tests.rs @@ -93,12 +93,14 @@ fn packet_report( timestamp: u64, payload_size: u32, payload_hash: Vec, + free: bool, ) -> PacketRouterPacketReport { PacketRouterPacketReport { received_timestamp: Utc.timestamp_opt(timestamp as i64, 0).unwrap(), oui, net_id: 0, rssi: 0, + free, frequency: 0, snr: 0.0, data_rate: DataRate::Fsk50, @@ -121,6 +123,7 @@ fn join_packet_report( oui, net_id: 0, rssi: 0, + free: true, frequency: 0, snr: 0.0, data_rate: DataRate::Fsk50, @@ -132,12 +135,21 @@ fn join_packet_report( } } -fn valid_packet(timestamp: u64, payload_size: u32, payload_hash: Vec) -> ValidPacket { +fn valid_packet( + timestamp: u64, + payload_size: u32, + payload_hash: Vec, + paid: bool, +) -> ValidPacket { ValidPacket { payload_size, payload_hash, gateway: vec![], - num_dcs: payload_size_to_dc(payload_size as u64) as u32, + num_dcs: if paid { + payload_size_to_dc(payload_size as u64) as u32 + } else { + 0 + }, packet_timestamp: timestamp, } } @@ -194,9 +206,9 @@ async fn test_config_unlocking() { 1, balances.clone(), stream::iter(vec![ - packet_report(0, 0, 24, vec![1]), - packet_report(0, 1, 48, vec![2]), - packet_report(0, 2, 1, vec![3]), + packet_report(0, 0, 24, vec![1], false), + packet_report(0, 1, 48, vec![2], false), + packet_report(0, 2, 1, vec![3], false), ]), &mut valid_packets, &mut invalid_packets, @@ -252,9 +264,9 @@ async fn test_config_unlocking() { 1, balances.clone(), stream::iter(vec![ - packet_report(0, 0, 24, vec![1]), - packet_report(0, 1, 48, vec![2]), - packet_report(0, 2, 1, vec![3]), + packet_report(0, 0, 24, vec![1], false), + packet_report(0, 1, 48, vec![2], false), + packet_report(0, 2, 1, vec![3], false), ]), &mut valid_packets, &mut invalid_packets, @@ -274,21 +286,87 @@ async fn test_config_unlocking() { ); } +#[tokio::test] +async fn test_verifier_free_packets() { + // Org packets + let packets = vec![ + packet_report(0, 0, 24, vec![4], true), + packet_report(0, 1, 48, vec![5], true), + packet_report(0, 2, 1, vec![6], true), + ]; + + let org_pubkey = PublicKeyBinary::from(vec![0]); + + // Set up orgs: + let orgs = MockConfigServer::default(); + orgs.insert(0_u64, org_pubkey.clone()).await; + + // Set up balances: + let mut balances = HashMap::new(); + balances.insert(org_pubkey.clone(), 5); + let balances = InstantlyBurnedBalance(Arc::new(Mutex::new(balances))); + + // Set up output: + let mut valid_packets = Vec::new(); + let mut invalid_packets = Vec::new(); + + // Set up verifier: + let mut verifier = Verifier { + debiter: balances.0.clone(), + config_server: orgs, + }; + // Run the verifier: + verifier + .verify( + 1, + balances.clone(), + stream::iter(packets), + &mut valid_packets, + &mut invalid_packets, + ) + .await + .unwrap(); + + // Verify packet reports: + assert_eq!( + valid_packets, + vec![ + valid_packet(0, 24, vec![4], false), + valid_packet(1000, 48, vec![5], false), + valid_packet(2000, 1, vec![6], false), + ] + ); + + assert!(invalid_packets.is_empty()); + + let payers = verifier.config_server.payers.lock().await; + assert!(payers.get(&0).unwrap().enabled); + + assert_eq!( + verifier + .debiter + .payer_balance(&org_pubkey) + .await + .expect("unchanged balance"), + 5 + ); +} + #[tokio::test] async fn test_verifier() { let packets = vec![ // Packets for first OUI - packet_report(0, 0, 24, vec![1]), - packet_report(0, 1, 48, vec![2]), - packet_report(0, 2, 1, vec![3]), + packet_report(0, 0, 24, vec![1], false), + packet_report(0, 1, 48, vec![2], false), + packet_report(0, 2, 1, vec![3], false), join_packet_report(0, 3, 1, vec![4]), // Packets for second OUI - packet_report(1, 0, 24, vec![4]), - packet_report(1, 1, 48, vec![5]), - packet_report(1, 2, 1, vec![6]), + packet_report(1, 0, 24, vec![4], false), + packet_report(1, 1, 48, vec![5], false), + packet_report(1, 2, 1, vec![6], false), join_packet_report(1, 3, 1, vec![4]), // Packets for third OUI - packet_report(2, 0, 24, vec![7]), + packet_report(2, 0, 24, vec![7], false), join_packet_report(2, 1, 1, vec![4]), ]; // Set up orgs: @@ -328,14 +406,14 @@ async fn test_verifier() { valid_packets, vec![ // First two packets for OUI #0 are valid - valid_packet(0, 24, vec![1]), - valid_packet(1000, 48, vec![2]), + valid_packet(0, 24, vec![1], true), + valid_packet(1000, 48, vec![2], true), // All packets for OUI #1 are valid - valid_packet(0, 24, vec![4]), - valid_packet(1000, 48, vec![5]), - valid_packet(2000, 1, vec![6]), + valid_packet(0, 24, vec![4], true), + valid_packet(1000, 48, vec![5], true), + valid_packet(2000, 1, vec![6], true), // All packets for OUI #2 are valid - valid_packet(0, 24, vec![7]), + valid_packet(0, 24, vec![7], true), ] ); @@ -398,10 +476,10 @@ async fn test_end_to_end() { 1, pending_burns.clone(), stream::iter(vec![ - packet_report(0, 0, BYTES_PER_DC as u32, vec![1]), - packet_report(0, 1, BYTES_PER_DC as u32, vec![2]), - packet_report(0, 2, BYTES_PER_DC as u32, vec![3]), - packet_report(0, 3, BYTES_PER_DC as u32, vec![4]), + packet_report(0, 0, BYTES_PER_DC as u32, vec![1], false), + packet_report(0, 1, BYTES_PER_DC as u32, vec![2], false), + packet_report(0, 2, BYTES_PER_DC as u32, vec![3], false), + packet_report(0, 3, BYTES_PER_DC as u32, vec![4], false), ]), &mut valid_packets, &mut invalid_packets, @@ -424,9 +502,9 @@ async fn test_end_to_end() { assert_eq!( valid_packets, vec![ - valid_packet(0, BYTES_PER_DC as u32, vec![1]), - valid_packet(1000, BYTES_PER_DC as u32, vec![2]), - valid_packet(2000, BYTES_PER_DC as u32, vec![3]), + valid_packet(0, BYTES_PER_DC as u32, vec![1], true), + valid_packet(1000, BYTES_PER_DC as u32, vec![2], true), + valid_packet(2000, BYTES_PER_DC as u32, vec![3], true), ] ); @@ -477,7 +555,13 @@ async fn test_end_to_end() { .verify( 1, pending_burns.clone(), - stream::iter(vec![packet_report(0, 4, BYTES_PER_DC as u32, vec![5])]), + stream::iter(vec![packet_report( + 0, + 4, + BYTES_PER_DC as u32, + vec![5], + false, + )]), &mut valid_packets, &mut invalid_packets, ) diff --git a/iot_verifier/src/packet_loader.rs b/iot_verifier/src/packet_loader.rs index e6a7f1f49..afdc1ba46 100644 --- a/iot_verifier/src/packet_loader.rs +++ b/iot_verifier/src/packet_loader.rs @@ -89,11 +89,15 @@ impl PacketLoader { file_info_stream .into_stream(&mut transaction) .await? - .map(|valid_packet| { - ( - ValidPacket::from(valid_packet.clone()), - GatewayDCShare::share_from_packet(&valid_packet), - ) + .filter_map(|valid_packet| async move { + if valid_packet.num_dcs > 0 { + Some(( + ValidPacket::from(valid_packet.clone()), + GatewayDCShare::share_from_packet(&valid_packet), + )) + } else { + None + } }) .map(anyhow::Ok) .try_fold(