diff --git a/Cargo.lock b/Cargo.lock index 4ff1a8f91..8bd05dd4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3301,31 +3301,23 @@ dependencies = [ name = "iot-packet-verifier" version = "0.1.0" dependencies = [ - "anchor-client", - "anchor-lang", "anyhow", "async-trait", "chrono", "clap 3.2.23", "config", - "data-credits", "db-store", "file-store", "futures", "futures-util", "helium-crypto", "helium-proto", - "helium-sub-daos", "http", "http-serde", "poc-metrics", "prost", "serde", - "sha2 0.10.6", - "solana-client", - "solana-program", - "solana-sdk", - "spl-token", + "solana", "sqlx", "thiserror", "tokio", @@ -3894,31 +3886,25 @@ dependencies = [ name = "mobile-packet-verifier" version = "0.1.0" dependencies = [ - "anchor-client", - "anchor-lang", "anyhow", "async-trait", "chrono", "clap 3.2.23", "config", - "data-credits", "db-store", "file-store", "futures", "futures-util", "helium-crypto", "helium-proto", - "helium-sub-daos", "http", "http-serde", + "mobile-config", "poc-metrics", "prost", "serde", "sha2 0.10.6", - "solana-client", - "solana-program", - "solana-sdk", - "spl-token", + "solana", "sqlx", "thiserror", "tokio", @@ -5889,6 +5875,27 @@ dependencies = [ "winapi", ] +[[package]] +name = "solana" +version = "0.1.0" +dependencies = [ + "anchor-client", + "anchor-lang", + "async-trait", + "data-credits", + "helium-crypto", + "helium-sub-daos", + "serde", + "sha2 0.9.9", + "solana-client", + "solana-program", + "solana-sdk", + "spl-token", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "solana-account-decoder" version = "1.14.15" diff --git a/Cargo.toml b/Cargo.toml index 5a42a8f93..be65c9680 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ members = [ "iot_config", "price", "mobile_config", + "solana", ] [workspace.package] @@ -29,6 +30,7 @@ edition = "2021" [workspace.dependencies] anchor-lang = "0.26.0" +anchor-client = "0.26" anyhow = {version = "1", features = ["backtrace"]} thiserror = "1" clap = {version = "3", features = ["derive"]} @@ -60,6 +62,8 @@ helium-proto = {git = "https://github.com/helium/proto", branch = "master", feat hextree = "*" solana-client = "1.14" solana-sdk = "1.14" +solana-program = "1.11" +spl-token = "3.5.0" reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]} beacon = {git = "https://github.com/helium/gateway-rs.git", branch = "main"} humantime = "2" diff --git a/iot_packet_verifier/Cargo.toml b/iot_packet_verifier/Cargo.toml index 658a643f8..3cfe06477 100644 --- a/iot_packet_verifier/Cargo.toml +++ b/iot_packet_verifier/Cargo.toml @@ -8,8 +8,6 @@ license.workspace = true [dependencies] anyhow = {workspace = true} -anchor-lang = {workspace = true} -anchor-client = "0.26" async-trait = {workspace = true} clap = {workspace = true} config = {workspace = true} @@ -20,14 +18,11 @@ futures-util = {workspace = true} file-store = {path = "../file_store"} helium-proto = {workspace = true} helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "solana"]} -solana-client = {workspace = true} -solana-sdk = {workspace = true} -solana-program = "1.11" -spl-token = "3.5.0" poc-metrics = {path = "../metrics"} prost = {workspace = true} serde = {workspace = true} sqlx = {workspace = true} +solana = {path = "../solana"} thiserror = {workspace = true} tokio = {workspace = true} tonic = {workspace = true} @@ -36,9 +31,7 @@ tracing-subscriber = {workspace = true} triggered = {workspace = true} http = {workspace = true} http-serde = {workspace = true} -sha2 = {workspace = true} -data-credits = {workspace = true} -helium-sub-daos = {workspace = true} + diff --git a/iot_packet_verifier/pkg/settings-template.toml b/iot_packet_verifier/pkg/settings-template.toml index a6edcd29d..135d516ee 100644 --- a/iot_packet_verifier/pkg/settings-template.toml +++ b/iot_packet_verifier/pkg/settings-template.toml @@ -6,27 +6,11 @@ # Cache location for generated verified reports; Required cache = "/var/data/verified-reports" -# Data credit burn period in minutes. (Default is 1) -# burn_period = 1 - -# Public key for the Data Credits Mint (devnet mint provided here) -dc_mint = "dcuc8Amr83Wz27ZkQ2K9NS6r8zRpf1J6cvArEBDZDmm" -# Public key for the DNT Mint (devnet mint provided here) -dnt_mint = "iotEVVZLEywoTn1QdwNPddxPWszn3zFhEot3MfL9fns" -# Public key for the HNT Mint (devnet mint provided here) -hnt_mint = "hntyVP6YFm1Hg25TN9WGLqM12b8TQmcknKrdu1oxWux" +# URL for the config server +org_url = "" -# Solana RPC. This may contain a secret -solana_rpc = "http://localhost:8899" -# Path to the keypair used to sign data credit burn solana transactions -burn_keypair = "" # Path to the keypair used to authorize config server rpc calls config_keypair = "" -# Solana cluster to use. "devnet" or "mainnet" -cluster = "devnet" - -# URL for the config server -org_url = "" # We will burn data credits from the solana chain every `burn_period` minutes. burn_period = 1 @@ -38,6 +22,18 @@ burn_period = 1 # default. enable_solana_integration = "false" +[solana] +# Solana RPC. This may contain a secret +rpc_url = "http://localhost:8899" +# Path to the keypair used to sign data credit burn solana transactions +burn_keypair = "" +# Solana cluster to use. "devnet" or "mainnet" +cluster = "devnet" +# Public key for the Data Credits Mint +dc_mint = "dcuc8Amr83Wz27ZkQ2K9NS6r8zRpf1J6cvArEBDZDmm" +# Public key for the DNT Mint (IOT mint) +dnt_mint = "iotEVVZLEywoTn1QdwNPddxPWszn3zFhEot3MfL9fns" + [database] url = "postgresql://postgres:password@localhost:5432/postgres" diff --git a/iot_packet_verifier/src/balances.rs b/iot_packet_verifier/src/balances.rs index 61f804c5c..f4a9a3d8a 100644 --- a/iot_packet_verifier/src/balances.rs +++ b/iot_packet_verifier/src/balances.rs @@ -1,10 +1,10 @@ use crate::{ pending_burns::{Burn, PendingBurns}, - solana::SolanaNetwork, verifier::Debiter, }; use futures_util::StreamExt; use helium_crypto::PublicKeyBinary; +use solana::SolanaNetwork; use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; diff --git a/iot_packet_verifier/src/burner.rs b/iot_packet_verifier/src/burner.rs index 1e00ac3d4..777b59f0f 100644 --- a/iot_packet_verifier/src/burner.rs +++ b/iot_packet_verifier/src/burner.rs @@ -1,8 +1,8 @@ use crate::{ balances::{BalanceCache, BalanceStore}, pending_burns::{Burn, PendingBurns}, - solana::SolanaNetwork, }; +use solana::SolanaNetwork; use std::time::Duration; use tokio::task; diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 58ad732a3..f26a56afa 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -2,7 +2,6 @@ use crate::{ balances::BalanceCache, burner::Burner, settings::Settings, - solana::SolanaRpc, verifier::{CachedOrgClient, Verifier}, }; use anyhow::{bail, Error, Result}; @@ -14,8 +13,7 @@ use file_store::{ FileSinkBuilder, FileStore, FileType, }; use futures_util::TryFutureExt; -use solana_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::signature::read_keypair_file; +use solana::SolanaRpc; use sqlx::{Pool, Postgres}; use std::sync::Arc; use tokio::sync::mpsc::Receiver; @@ -93,21 +91,11 @@ impl Cmd { sqlx::migrate!().run(&pool).await?; let solana = if settings.enable_solana_integration { - let burn_keypair = match read_keypair_file(&settings.burn_keypair) { - Ok(kp) => kp, - Err(e) => bail!("Failed to read keypair file ({})", e), + let Some(ref solana_settings) = settings.solana else { + bail!("Missing solana section in settings"); }; // Set up the solana RpcClient: - Some( - SolanaRpc::new( - RpcClient::new(settings.solana_rpc.clone()), - settings.cluster.clone(), - burn_keypair, - settings.dc_mint()?, - settings.dnt_mint()?, - ) - .await?, - ) + Some(SolanaRpc::new(solana_settings).await?) } else { None }; diff --git a/iot_packet_verifier/src/lib.rs b/iot_packet_verifier/src/lib.rs index 30ee23c3e..b7459206f 100644 --- a/iot_packet_verifier/src/lib.rs +++ b/iot_packet_verifier/src/lib.rs @@ -1,8 +1,6 @@ pub mod balances; pub mod burner; pub mod daemon; -pub mod pdas; pub mod pending_burns; pub mod settings; -pub mod solana; pub mod verifier; diff --git a/iot_packet_verifier/src/pdas.rs b/iot_packet_verifier/src/pdas.rs deleted file mode 100644 index 5b7d6b04d..000000000 --- a/iot_packet_verifier/src/pdas.rs +++ /dev/null @@ -1,22 +0,0 @@ -//! Functions for returning Program Derived Addresses in order to complete -//! actions on the Solana chain. - -use helium_crypto::PublicKeyBinary; -use sha2::{Digest, Sha256}; -use solana_sdk::pubkey::Pubkey; - -/// Returns the PDA for the Delegated Data Credits of the given `payer`. -pub fn delegated_data_credits(sub_dao: &Pubkey, payer: &PublicKeyBinary) -> Pubkey { - let mut hasher = Sha256::new(); - hasher.update(payer.to_string()); - let sha_digest = hasher.finalize(); - let (ddc_key, _) = Pubkey::find_program_address( - &[ - "delegated_data_credits".as_bytes(), - sub_dao.as_ref(), - &sha_digest, - ], - &data_credits::ID, - ); - ddc_key -} diff --git a/iot_packet_verifier/src/settings.rs b/iot_packet_verifier/src/settings.rs index 56fb366b7..2473dc9b2 100644 --- a/iot_packet_verifier/src/settings.rs +++ b/iot_packet_verifier/src/settings.rs @@ -2,7 +2,6 @@ use chrono::{DateTime, TimeZone, Utc}; use config::{Config, ConfigError, Environment, File}; use helium_proto::services::{iot_config::config_org_client::OrgClient, Channel, Endpoint}; use serde::Deserialize; -use solana_sdk::pubkey::{ParsePubkeyError, Pubkey}; use std::path::{Path, PathBuf}; #[derive(Debug, Deserialize)] @@ -13,18 +12,10 @@ pub struct Settings { pub log: String, /// Cache location for generated verified reports pub cache: String, - /// Solana RpcClient URL: - pub solana_rpc: String, - /// Path to the keypair for signing burn transactions - pub burn_keypair: PathBuf, /// Path to the keypair for signing config changes pub config_keypair: PathBuf, /// Data credit burn period in minutes. Default is 1. pub burn_period: u64, - pub cluster: String, - pub dc_mint: String, - pub dnt_mint: String, - pub hnt_mint: String, pub database: db_store::Settings, pub ingest: file_store::Settings, pub output: file_store::Settings, @@ -33,6 +24,7 @@ pub struct Settings { pub org_url: http::Uri, #[serde(default)] pub enable_solana_integration: bool, + pub solana: Option, #[serde(default = "default_start_after")] pub start_after: u64, } @@ -72,18 +64,6 @@ impl Settings { .and_then(|config| config.try_deserialize()) } - pub fn dc_mint(&self) -> Result { - self.dc_mint.parse() - } - - pub fn dnt_mint(&self) -> Result { - self.dnt_mint.parse() - } - - pub fn hnt_mint(&self) -> Result { - self.hnt_mint.parse() - } - pub fn connect_org(&self) -> OrgClient { OrgClient::new(Endpoint::from(self.org_url.clone()).connect_lazy()) } diff --git a/mobile_packet_verifier/Cargo.toml b/mobile_packet_verifier/Cargo.toml index 0c4e7032b..6dc4edcdd 100644 --- a/mobile_packet_verifier/Cargo.toml +++ b/mobile_packet_verifier/Cargo.toml @@ -8,8 +8,6 @@ license.workspace = true [dependencies] anyhow = {workspace = true} -anchor-lang = {workspace = true} -anchor-client = "0.26" async-trait = {workspace = true} clap = {workspace = true} config = {workspace = true} @@ -20,14 +18,12 @@ futures-util = {workspace = true} file-store = {path = "../file_store"} helium-proto = {workspace = true} helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "solana"]} -solana-client = {workspace = true} -solana-sdk = {workspace = true} -solana-program = "1.11" -spl-token = "3.5.0" poc-metrics = {path = "../metrics"} prost = {workspace = true} serde = {workspace = true} sqlx = {workspace = true} +solana = {path = "../solana"} +mobile-config = {path = "../mobile_config"} thiserror = {workspace = true} tokio = {workspace = true} tonic = {workspace = true} @@ -37,5 +33,3 @@ triggered = {workspace = true} http = {workspace = true} http-serde = {workspace = true} sha2 = {workspace = true} -data-credits = {workspace = true} -helium-sub-daos = {workspace = true} diff --git a/mobile_packet_verifier/pkg/settings-template.toml b/mobile_packet_verifier/pkg/settings-template.toml new file mode 100644 index 000000000..827b0cc7d --- /dev/null +++ b/mobile_packet_verifier/pkg/settings-template.toml @@ -0,0 +1,77 @@ + +# log settings for the application (RUST_LOG format). Default below +# +# log = "mobile_4packet_verifier=debug" + +# Cache location for generated verified reports; Required +cache = "/var/data/verified-reports" + +# We will burn data credits from the solana chain every `burn_period` hours. +# Default value is 1 hour. +burn_period = 1 + +# If set to true, enables integration with the Solana network. This includes +# checking payer balances and burning data credits. If this is disabled, all +# payers will have a default balance of 1,000,000 data credits, and burned +# data credits will only be stored in the database. This is set to false by +# default. +enable_solana_integration = "false" + +[solana] +# Solana RPC. This may contain a secret +rpc_url = "http://localhost:8899" +# Path to the keypair used to sign data credit burn solana transactions +burn_keypair = "" +# Solana cluster to use. "devnet" or "mainnet" +cluster = "devnet" +# Public key for the Data Credits Mint +dc_mint = "dcuc8Amr83Wz27ZkQ2K9NS6r8zRpf1J6cvArEBDZDmm" +# Public key for the DNT Mint (Mobile mint) +dnt_mint = "mb1eu7TzEc71KxDpsmsKoucSSuuoGLv1drys1oP2jh6" + +[database] + +url = "postgresql://postgres:password@localhost:5432/postgres" + +# Max connections to the database. Default below +# +# max_connections = 10 + +[ingest] + +# Input bucket details for ingest data + +# Name of bucket to access ingest data. Required +# +bucket = "helium-mainnet-mobile-ingest" + +# Region for bucket. Defaults to below +# +region = "us-west-2" + +# Optional URL for AWS api endpoint. Inferred from aws config settings or aws +# IAM context by default +# +# endpoint = "https://aws-s3-bucket.aws.com" + +[output] +# Output bucket for verified reports + +# Name of bucket to write details to. Required +# +bucket = "" + +# Region for bucket. Defaults to below +# +# region = "us-west-2" + +# Optional URL for AWS api endpoint. Inferred from aws config settings or aws +# IAM context by default +# +# endpoint = "https://aws-s3-bucket.aws.com" + +[metrics] + +# Endpoint for metrics. Default below +# +# endpoint = "127.0.0.1:19000" diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index cec831d28..34860b509 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -1,6 +1,10 @@ use chrono::{DateTime, Utc}; use file_store::mobile_session::DataTransferSessionIngestReport; use futures::{Stream, StreamExt}; +use mobile_config::{ + client::{Client, ClientError}, + gateway_info::GatewayInfoResolver, +}; use sqlx::{Postgres, Transaction}; #[derive(thiserror::Error, Debug)] @@ -11,9 +15,12 @@ pub enum AccumulationError { SqlxError(#[from] sqlx::Error), #[error("reports stream dropped")] ReportsStreamDropped, + #[error("config client error: {0}")] + ConfigClientError(#[from] ClientError), } pub async fn accumulate_sessions( + config_client: &mut Client, conn: &mut Transaction<'_, Postgres>, curr_file_ts: DateTime, reports: impl Stream, @@ -21,10 +28,17 @@ pub async fn accumulate_sessions( tokio::pin!(reports); while let Some(DataTransferSessionIngestReport { report, .. }) = reports.next().await { - if report.reward_cancelled { + let event = report.data_transfer_usage; + // If the reward has been cancelled or we cannot resolve this gateway, skip the + // report + if report.reward_cancelled + || config_client + .resolve_gateway_info(&event.pub_key) + .await? + .is_none() + { continue; } - let event = report.data_transfer_usage; sqlx::query( r#" INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, first_timestamp, last_timestamp) @@ -32,7 +46,7 @@ pub async fn accumulate_sessions( ON CONFLICT (pub_key, payer) DO UPDATE SET uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes, downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes, - last_timestamp = MAX(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) + last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) "# ) .bind(event.pub_key) diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index fac830de5..26dc00e45 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -1,103 +1,63 @@ -use crate::settings::Settings; -use anchor_client::{RequestBuilder, RequestNamespace}; -use anchor_lang::AccountDeserialize; use chrono::{DateTime, Utc}; -use data_credits::{accounts, instruction, DelegatedDataCreditsV0}; use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; -use helium_sub_daos::{DaoV0, SubDaoV0}; -use solana_client::{ - client_error::ClientError, nonblocking::rpc_client::RpcClient, - rpc_config::RpcSendTransactionConfig, -}; -use solana_sdk::{ - commitment_config::CommitmentConfig, - pubkey::{ParsePubkeyError, Pubkey}, - signature::Keypair, - signer::Signer, - transaction::Transaction as SolanaTransaction, -}; +use solana::SolanaNetwork; use sqlx::{FromRow, Pool, Postgres}; -use std::{ - collections::HashMap, - time::{SystemTime, SystemTimeError}, -}; +use std::collections::HashMap; #[derive(FromRow)] pub struct DataTransferSession { pub_key: PublicKeyBinary, payer: PublicKeyBinary, - upload_bytes: i64, - download_bytes: i64, + uploaded_bytes: i64, + downloaded_bytes: i64, first_timestamp: DateTime, last_timestamp: DateTime, } #[derive(Default)] pub struct PayerTotals { - total_bytes: u64, + total_dcs: u64, sessions: Vec, } impl PayerTotals { fn push_sess(&mut self, sess: DataTransferSession) { - self.total_bytes += sess.download_bytes as u64 + sess.upload_bytes as u64; + self.total_dcs += bytes_to_dc(sess.downloaded_bytes as u64 + sess.uploaded_bytes as u64); self.sessions.push(sess); } } -pub struct Burner { +pub struct Burner { valid_sessions: FileSinkClient, - provider: RpcClient, - program_cache: BurnProgramCache, - keypair: [u8; 64], - cluster: String, - enable_dc_burn: bool, + solana: S, +} + +impl Burner { + pub fn new(valid_sessions: FileSinkClient, solana: S) -> Self { + Self { + valid_sessions, + solana, + } + } } #[derive(thiserror::Error, Debug)] -pub enum BurnError { - #[error("Solana client error: {0}")] - SolanaClientError(#[from] ClientError), - #[error("Anchor error: {0}")] - AnchorError(#[from] anchor_lang::error::Error), - #[error("sql error: {0}")] - SqlError(#[from] sqlx::Error), - #[error("Parse pubkey error: {0}")] - ParsePubkeyError(#[from] ParsePubkeyError), +pub enum BurnError { #[error("file store error: {0}")] FileStoreError(#[from] file_store::Error), - #[error("Burn transaction {0} failed")] - TransactionFailed(solana_sdk::signature::Signature), - #[error("DC burn authority does not match keypair")] - InvalidKeypair, - #[error("System time error: {0}")] - SystemTimeError(#[from] SystemTimeError), + #[error("sql error: {0}")] + SqlError(#[from] sqlx::Error), + #[error("solana error: {0}")] + SolanaError(E), } -impl Burner { - pub async fn new( - settings: &Settings, - valid_sessions: FileSinkClient, - provider: RpcClient, - keypair: Keypair, - ) -> Result { - let program_cache = BurnProgramCache::new(settings, &provider).await?; - if program_cache.dc_burn_authority != keypair.pubkey() { - return Err(BurnError::InvalidKeypair); - } - Ok(Self { - provider, - program_cache, - valid_sessions, - keypair: keypair.to_bytes(), - cluster: settings.cluster.clone(), - enable_dc_burn: settings.enable_dc_burn, - }) - } - - pub async fn burn(&self, pool: &Pool) -> Result<(), BurnError> { +impl Burner +where + S: SolanaNetwork, +{ + pub async fn burn(&self, pool: &Pool) -> Result<(), BurnError> { // Fetch all of the sessions let sessions: Vec = sqlx::query_as("SELECT * FROM data_transfer_sessions") @@ -116,33 +76,36 @@ impl Burner { for ( payer, PayerTotals { - total_bytes, + total_dcs, sessions, }, ) in payer_totals.into_iter() { - let amount = bytes_to_dc(total_bytes); + tracing::info!("Burning {total_dcs} DC from {payer}"); - if self.enable_dc_burn { - self.burn_data_credits(&payer, amount).await?; - } + self.solana + .burn_data_credits(&payer, total_dcs) + .await + .map_err(BurnError::SolanaError)?; // Delete from the data transfer session and write out to S3 - sqlx::query("DELETE FROM data_tranfer_sessions WHERE payer = $1") + sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") .bind(payer) .execute(pool) .await?; for session in sessions { + let num_dcs = + bytes_to_dc(session.uploaded_bytes as u64 + session.downloaded_bytes as u64); self.valid_sessions .write( ValidDataTransferSession { pub_key: session.pub_key.into(), payer: session.payer.into(), - upload_bytes: session.upload_bytes as u64, - download_bytes: session.download_bytes as u64, - num_dcs: amount, + upload_bytes: session.uploaded_bytes as u64, + download_bytes: session.downloaded_bytes as u64, + num_dcs, first_timestamp: session.first_timestamp.encode_timestamp_millis(), last_timestamp: session.last_timestamp.encode_timestamp_millis(), }, @@ -154,105 +117,6 @@ impl Burner { Ok(()) } - - async fn burn_data_credits( - &self, - payer: &PublicKeyBinary, - amount: u64, - ) -> Result<(), BurnError> { - tracing::info!("Burning {} DC from {}", amount, payer); - - // Fetch the sub dao epoch info: - const EPOCH_LENGTH: u64 = 60 * 60 * 24; - let epoch = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH)? - .as_secs() - / EPOCH_LENGTH; - let (sub_dao_epoch_info, _) = Pubkey::find_program_address( - &[ - "sub_dao_epoch_info".as_bytes(), - self.program_cache.sub_dao.as_ref(), - &epoch.to_le_bytes(), - ], - &helium_sub_daos::ID, - ); - - // Burn the DC for the payer - let ddc_key = crate::pdas::delegated_data_credits(&self.program_cache.sub_dao, payer); - let account_data = self.provider.get_account_data(&ddc_key).await?; - let mut account_data = account_data.as_ref(); - let escrow_account = - DelegatedDataCreditsV0::try_deserialize(&mut account_data)?.escrow_account; - - let instructions = { - let request = RequestBuilder::from( - data_credits::id(), - &self.cluster, - std::rc::Rc::new(Keypair::from_bytes(&self.keypair).unwrap()), - Some(CommitmentConfig::confirmed()), - RequestNamespace::Global, - ); - - let accounts = accounts::BurnDelegatedDataCreditsV0 { - sub_dao_epoch_info, - dao: self.program_cache.dao, - sub_dao: self.program_cache.sub_dao, - account_payer: self.program_cache.account_payer, - data_credits: self.program_cache.data_credits, - delegated_data_credits: ddc_key, - token_program: spl_token::id(), - helium_sub_daos_program: helium_sub_daos::id(), - system_program: solana_program::system_program::id(), - dc_burn_authority: self.program_cache.dc_burn_authority, - dc_mint: self.program_cache.dc_mint, - escrow_account, - registrar: self.program_cache.registrar, - }; - let args = instruction::BurnDelegatedDataCreditsV0 { - args: data_credits::BurnDelegatedDataCreditsArgsV0 { amount }, - }; - - // As far as I can tell, the instructions function does not actually have any - // error paths. - request - .accounts(accounts) - .args(args) - .instructions() - .unwrap() - }; - - let blockhash = self.provider.get_latest_blockhash().await?; - let signer = Keypair::from_bytes(&self.keypair).unwrap(); - - let tx = SolanaTransaction::new_signed_with_payer( - &instructions, - Some(&signer.pubkey()), - &[&signer], - blockhash, - ); - - // Preflight can be flakey, so we skip it for now - let config = RpcSendTransactionConfig { - skip_preflight: true, - ..Default::default() - }; - let signature = self - .provider - .send_transaction_with_config(&tx, config) - .await?; - let result = self.provider.confirm_transaction(&signature).await?; - - if !result { - return Err(BurnError::TransactionFailed(signature)); - } - - tracing::info!( - "Successfully burned data credits. Transaction: {}", - signature - ); - - Ok(()) - } } const BYTES_PER_DC: u64 = 66; @@ -261,49 +125,3 @@ fn bytes_to_dc(bytes: u64) -> u64 { let bytes = bytes.max(BYTES_PER_DC); (bytes + BYTES_PER_DC - 1) / BYTES_PER_DC } - -/// Cached pubkeys for the burn program -pub struct BurnProgramCache { - pub account_payer: Pubkey, - pub data_credits: Pubkey, - pub sub_dao: Pubkey, - pub dao: Pubkey, - pub dc_mint: Pubkey, - pub dc_burn_authority: Pubkey, - pub registrar: Pubkey, -} - -impl BurnProgramCache { - pub async fn new(settings: &Settings, provider: &RpcClient) -> Result { - let (account_payer, _) = - Pubkey::find_program_address(&["account_payer".as_bytes()], &data_credits::ID); - let (data_credits, _) = Pubkey::find_program_address( - &["dc".as_bytes(), settings.dc_mint()?.as_ref()], - &data_credits::ID, - ); - let (sub_dao, _) = Pubkey::find_program_address( - &["sub_dao".as_bytes(), settings.dnt_mint()?.as_ref()], - &helium_sub_daos::ID, - ); - let (dao, dc_burn_authority) = { - let account_data = provider.get_account_data(&sub_dao).await?; - let mut account_data = account_data.as_ref(); - let sub_dao = SubDaoV0::try_deserialize(&mut account_data)?; - (sub_dao.dao, sub_dao.dc_burn_authority) - }; - let registrar = { - let account_data = provider.get_account_data(&dao).await?; - let mut account_data = account_data.as_ref(); - DaoV0::try_deserialize(&mut account_data)?.registrar - }; - Ok(Self { - account_payer, - data_credits, - sub_dao, - dao, - dc_mint: settings.dc_mint()?, - dc_burn_authority, - registrar, - }) - } -} diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index f95583878..d88b171b3 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -1,5 +1,6 @@ use crate::{burner::Burner, settings::Settings}; use anyhow::{bail, Error, Result}; +use chrono::{TimeZone, Utc}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, file_source, file_upload, @@ -7,36 +8,44 @@ use file_store::{ FileSinkBuilder, FileStore, FileType, }; use futures_util::TryFutureExt; -use solana_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::signature::read_keypair_file; +use mobile_config::Client; +use solana::{SolanaNetwork, SolanaRpc}; use sqlx::{Pool, Postgres}; use tokio::{ sync::mpsc::Receiver, time::{sleep_until, Duration, Instant}, }; -pub struct Daemon { +pub struct Daemon { pool: Pool, - burner: Burner, + burner: Burner, reports: Receiver>, burn_period: Duration, + config_client: Client, } -impl Daemon { +impl Daemon { pub fn new( settings: &Settings, pool: Pool, reports: Receiver>, - burner: Burner, + burner: Burner, + config_client: Client, ) -> Self { Self { pool, burner, reports, burn_period: Duration::from_secs(60 * 60 * settings.burn_period as u64), + config_client, } } +} +impl Daemon +where + S: SolanaNetwork, +{ pub async fn run(mut self, shutdown: &triggered::Listener) -> Result<()> { let mut burn_time = Instant::now() + self.burn_period; loop { @@ -49,7 +58,7 @@ impl Daemon { let ts = file.file_info.timestamp; let mut transaction = self.pool.begin().await?; let reports = file.into_stream(&mut transaction).await?; - crate::accumulate::accumulate_sessions(&mut transaction, ts, reports).await?; + crate::accumulate::accumulate_sessions(&mut self.config_client, &mut transaction, ts, reports).await?; transaction.commit().await?; }, _ = sleep_until(burn_time) => { @@ -83,13 +92,15 @@ impl Cmd { .await?; sqlx::migrate!().run(&pool).await?; - // Set up the solana RpcClient: - let rpc_client = RpcClient::new(settings.solana_rpc.clone()); - - // Set up the balance burner: - let burn_keypair = match read_keypair_file(&settings.burn_keypair) { - Ok(kp) => kp, - Err(e) => bail!("Failed to read keypair file ({})", e), + // Set up the solana network: + let solana = if settings.enable_solana_integration { + let Some(ref solana_settings) = settings.solana else { + bail!("Missing solana section in settings"); + }; + // Set up the solana RpcClient: + Some(SolanaRpc::new(solana_settings).await?) + } else { + None }; let (file_upload_tx, file_upload_rx) = file_upload::message_channel(); @@ -108,7 +119,7 @@ impl Cmd { .create() .await?; - let burner = Burner::new(settings, valid_sessions, rpc_client, burn_keypair).await?; + let burner = Burner::new(valid_sessions, solana); let file_store = FileStore::from_settings(&settings.ingest).await?; @@ -116,13 +127,18 @@ impl Cmd { file_source::continuous_source::() .db(pool.clone()) .store(file_store) + .lookback(LookbackBehavior::StartAfter( + Utc.timestamp_millis_opt(0).unwrap(), + )) .file_type(FileType::DataTransferSessionIngestReport) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .build()? .start(shutdown_listener.clone()) .await?; - let daemon = Daemon::new(settings, pool, reports, burner); + let config_client = Client::from_settings(&settings.config_client)?; + + let daemon = Daemon::new(settings, pool, reports, burner, config_client); tokio::try_join!( source_join_handle.map_err(Error::from), diff --git a/mobile_packet_verifier/src/lib.rs b/mobile_packet_verifier/src/lib.rs index e2fc0940d..ed66af217 100644 --- a/mobile_packet_verifier/src/lib.rs +++ b/mobile_packet_verifier/src/lib.rs @@ -1,5 +1,4 @@ pub mod accumulate; pub mod burner; pub mod daemon; -mod pdas; pub mod settings; diff --git a/mobile_packet_verifier/src/main.rs b/mobile_packet_verifier/src/main.rs index 838534fa1..2114911db 100644 --- a/mobile_packet_verifier/src/main.rs +++ b/mobile_packet_verifier/src/main.rs @@ -6,7 +6,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(clap::Parser)] #[clap(version = env!("CARGO_PKG_VERSION"))] -#[clap(about = "Helium IOT Packer Verifier Server")] +#[clap(about = "Helium Mobile Packer Verifier Server")] pub struct Cli { /// Optional configuration file to use. If present the toml file at the /// given path will be loaded. Environemnt variables can override the diff --git a/mobile_packet_verifier/src/pdas.rs b/mobile_packet_verifier/src/pdas.rs deleted file mode 100644 index 5b7d6b04d..000000000 --- a/mobile_packet_verifier/src/pdas.rs +++ /dev/null @@ -1,22 +0,0 @@ -//! Functions for returning Program Derived Addresses in order to complete -//! actions on the Solana chain. - -use helium_crypto::PublicKeyBinary; -use sha2::{Digest, Sha256}; -use solana_sdk::pubkey::Pubkey; - -/// Returns the PDA for the Delegated Data Credits of the given `payer`. -pub fn delegated_data_credits(sub_dao: &Pubkey, payer: &PublicKeyBinary) -> Pubkey { - let mut hasher = Sha256::new(); - hasher.update(payer.to_string()); - let sha_digest = hasher.finalize(); - let (ddc_key, _) = Pubkey::find_program_address( - &[ - "delegated_data_credits".as_bytes(), - sub_dao.as_ref(), - &sha_digest, - ], - &data_credits::ID, - ); - ddc_key -} diff --git a/mobile_packet_verifier/src/settings.rs b/mobile_packet_verifier/src/settings.rs index e2d1d9fc5..0ee241cf4 100644 --- a/mobile_packet_verifier/src/settings.rs +++ b/mobile_packet_verifier/src/settings.rs @@ -1,8 +1,7 @@ use chrono::{DateTime, TimeZone, Utc}; use config::{Config, ConfigError, Environment, File}; use serde::Deserialize; -use solana_sdk::pubkey::{ParsePubkeyError, Pubkey}; -use std::path::{Path, PathBuf}; +use std::path::Path; #[derive(Debug, Deserialize)] pub struct Settings { @@ -12,23 +11,17 @@ pub struct Settings { pub log: String, /// Cache location for generated verified reports pub cache: String, - /// Solana RpcClient URL: - pub solana_rpc: String, - /// Path to the keypair for signing burn transactions - pub burn_keypair: PathBuf, /// Burn period in hours. (Default is 1) #[serde(default = "default_burn_period")] pub burn_period: i64, - pub cluster: String, - pub dc_mint: String, - pub dnt_mint: String, - pub hnt_mint: String, pub database: db_store::Settings, pub ingest: file_store::Settings, pub output: file_store::Settings, pub metrics: poc_metrics::Settings, #[serde(default)] - pub enable_dc_burn: bool, + pub enable_solana_integration: bool, + pub solana: Option, + pub config_client: mobile_config::ClientSettings, #[serde(default = "default_start_after")] pub start_after: u64, } @@ -72,18 +65,6 @@ impl Settings { .and_then(|config| config.try_deserialize()) } - pub fn dc_mint(&self) -> Result { - self.dc_mint.parse() - } - - pub fn dnt_mint(&self) -> Result { - self.dnt_mint.parse() - } - - pub fn hnt_mint(&self) -> Result { - self.hnt_mint.parse() - } - pub fn start_after(&self) -> DateTime { Utc.timestamp_opt(self.start_after as i64, 0) .single() diff --git a/solana/Cargo.toml b/solana/Cargo.toml new file mode 100644 index 000000000..7b452024a --- /dev/null +++ b/solana/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "solana" +version = "0.1.0" +description = "Solana integration for Helium Oracles" +edition.workspace = true +authors.workspace = true +license.workspace = true + +[dependencies] +async-trait = {workspace = true} +anchor-lang = {workspace = true} +anchor-client = {workspace = true} +solana-client = {workspace = true} +solana-sdk = {workspace = true} +solana-program = {workspace = true} +spl-token = {workspace = true} +serde = {workspace = true} +data-credits = {workspace = true} +helium-sub-daos = {workspace = true} +sha2 = {workspace = true} +tokio = {workspace = true} +helium-crypto = {workspace = true, features = ["solana"]} +thiserror = {workspace = true} +tracing = {workspace = true} \ No newline at end of file diff --git a/iot_packet_verifier/src/solana.rs b/solana/src/lib.rs similarity index 85% rename from iot_packet_verifier/src/solana.rs rename to solana/src/lib.rs index a3798b1c1..890942958 100644 --- a/iot_packet_verifier/src/solana.rs +++ b/solana/src/lib.rs @@ -1,4 +1,3 @@ -use crate::pdas; use anchor_client::{RequestBuilder, RequestNamespace}; use anchor_lang::AccountDeserialize; use async_trait::async_trait; @@ -6,6 +5,8 @@ use data_credits::DelegatedDataCreditsV0; use data_credits::{accounts, instruction}; use helium_crypto::PublicKeyBinary; use helium_sub_daos::{DaoV0, SubDaoV0}; +use serde::Deserialize; +use sha2::{Digest, Sha256}; use solana_client::{ client_error::ClientError, nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig, @@ -14,7 +15,7 @@ use solana_sdk::{ commitment_config::CommitmentConfig, program_pack::Pack, pubkey::{ParsePubkeyError, Pubkey}, - signature::Keypair, + signature::{read_keypair_file, Keypair}, signer::Signer, transaction::Transaction, }; @@ -55,6 +56,17 @@ pub enum SolanaRpcError { InvalidKeypair, #[error("System time error: {0}")] SystemTimeError(#[from] SystemTimeError), + #[error("Failed to read keypair file")] + FailedToReadKeypairError, +} + +#[derive(Debug, Deserialize)] +pub struct Settings { + rpc_url: String, + cluster: String, + burn_keypair: String, + dc_mint: String, + dnt_mint: String, } pub struct SolanaRpc { @@ -65,19 +77,19 @@ pub struct SolanaRpc { } impl SolanaRpc { - pub async fn new( - provider: RpcClient, - cluster: String, - keypair: Keypair, - dc_mint: Pubkey, - dnt_mint: Pubkey, - ) -> Result, SolanaRpcError> { + pub async fn new(settings: &Settings) -> Result, SolanaRpcError> { + let dc_mint = settings.dc_mint.parse()?; + let dnt_mint = settings.dnt_mint.parse()?; + let Ok(keypair) = read_keypair_file(&settings.burn_keypair) else { + return Err(SolanaRpcError::FailedToReadKeypairError); + }; + let provider = RpcClient::new(settings.rpc_url.clone()); let program_cache = BurnProgramCache::new(&provider, dc_mint, dnt_mint).await?; if program_cache.dc_burn_authority != keypair.pubkey() { return Err(SolanaRpcError::InvalidKeypair); } Ok(Arc::new(Self { - cluster, + cluster: settings.cluster.clone(), provider, program_cache, keypair: keypair.to_bytes(), @@ -90,7 +102,7 @@ impl SolanaNetwork for SolanaRpc { type Error = SolanaRpcError; async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { - let ddc_key = pdas::delegated_data_credits(&self.program_cache.sub_dao, payer); + let ddc_key = delegated_data_credits(&self.program_cache.sub_dao, payer); let account_data = self.provider.get_account_data(&ddc_key).await?; let mut account_data = account_data.as_ref(); let ddc = DelegatedDataCreditsV0::try_deserialize(&mut account_data)?; @@ -104,8 +116,6 @@ impl SolanaNetwork for SolanaRpc { payer: &PublicKeyBinary, amount: u64, ) -> Result<(), Self::Error> { - tracing::info!("Burning {} DC from {}", amount, payer); - // Fetch the sub dao epoch info: const EPOCH_LENGTH: u64 = 60 * 60 * 24; let epoch = SystemTime::now() @@ -122,7 +132,7 @@ impl SolanaNetwork for SolanaRpc { ); // Fetch escrow account - let ddc_key = pdas::delegated_data_credits(&self.program_cache.sub_dao, payer); + let ddc_key = delegated_data_credits(&self.program_cache.sub_dao, payer); let account_data = self.provider.get_account_data(&ddc_key).await?; let mut account_data = account_data.as_ref(); let escrow_account = @@ -143,10 +153,7 @@ impl SolanaNetwork for SolanaRpc { sub_dao: self.program_cache.sub_dao, account_payer: self.program_cache.account_payer, data_credits: self.program_cache.data_credits, - delegated_data_credits: pdas::delegated_data_credits( - &self.program_cache.sub_dao, - payer, - ), + delegated_data_credits: delegated_data_credits(&self.program_cache.sub_dao, payer), token_program: spl_token::id(), helium_sub_daos_program: helium_sub_daos::id(), system_program: solana_program::system_program::id(), @@ -294,3 +301,19 @@ impl SolanaNetwork for Arc>> { Ok(()) } } + +/// Returns the PDA for the Delegated Data Credits of the given `payer`. +pub fn delegated_data_credits(sub_dao: &Pubkey, payer: &PublicKeyBinary) -> Pubkey { + let mut hasher = Sha256::new(); + hasher.update(payer.to_string()); + let sha_digest = hasher.finalize(); + let (ddc_key, _) = Pubkey::find_program_address( + &[ + "delegated_data_credits".as_bytes(), + sub_dao.as_ref(), + &sha_digest, + ], + &data_credits::ID, + ); + ddc_key +}