From eabd6ed29d94b86eb77350a115a2f2aa65eeefb6 Mon Sep 17 00:00:00 2001 From: Rahul Garg Date: Thu, 2 Mar 2023 09:26:37 -0700 Subject: [PATCH] Add support for token-price conversion - Temp update Cargo.toml and Cargo.lock - Update file info and dump cli - Add docker-compose.yml for local testing --- Cargo.lock | 39 ++++++- Cargo.toml | 7 +- file_store/src/cli/dump.rs | 13 ++- file_store/src/cli/info.rs | 14 ++- file_store/src/file_info.rs | 5 + file_store/src/file_sink.rs | 2 +- price/Cargo.toml | 32 ++++++ price/README.md | 16 +++ price/docker-compose.yml | 67 ++++++++++++ price/pkg/settings-template.toml | 56 ++++++++++ price/src/error.rs | 9 ++ price/src/lib.rs | 12 ++ price/src/main.rs | 156 ++++++++++++++++++++++++++ price/src/price_generator.rs | 181 +++++++++++++++++++++++++++++++ price/src/price_service.rs | 42 +++++++ price/src/settings.rs | 172 +++++++++++++++++++++++++++++ 16 files changed, 809 insertions(+), 14 deletions(-) create mode 100644 price/Cargo.toml create mode 100644 price/README.md create mode 100644 price/docker-compose.yml create mode 100644 price/pkg/settings-template.toml create mode 100644 price/src/error.rs create mode 100644 price/src/lib.rs create mode 100644 price/src/main.rs create mode 100644 price/src/price_generator.rs create mode 100644 price/src/price_service.rs create mode 100644 price/src/settings.rs diff --git a/Cargo.lock b/Cargo.lock index 41d275080..6c094caab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -997,7 +997,7 @@ checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/gateway-rs.git?branch=main#a8987fe422af9416b7419fb68151f79c087c5dac" +source = "git+https://github.com/helium/gateway-rs.git?branch=rg/proto-test-branch#9a44476d56686034507fc67eda747735d7195a3a" dependencies = [ "base64 0.21.0", "byteorder", @@ -2816,7 +2816,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#e1f810224509ce1c141ae78c8589ab3740231ee1" +source = "git+https://github.com/helium/proto?branch=rg/price-oracle-grpc#d1b3391b20ad3ea47789adfcff030e3c176759bc" dependencies = [ "bytes", "prost", @@ -4373,7 +4373,7 @@ dependencies = [ "axum 0.6.3", "base64 0.21.0", "blake3", - "bs58 0.3.1", + "bs58 0.4.0", "chrono", "clap 3.2.23", "config", @@ -4484,6 +4484,35 @@ dependencies = [ "syn 1.0.104", ] +[[package]] +name = "price" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "clap 3.2.23", + "config", + "file-store", + "futures", + "futures-util", + "helium-proto", + "metrics", + "metrics-exporter-prometheus", + "poc-metrics", + "prost", + "pyth-sdk-solana", + "serde", + "serde_json", + "solana-client", + "solana-program", + "thiserror", + "tokio", + "tonic", + "tracing", + "tracing-subscriber", + "triggered", +] + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -4648,9 +4677,9 @@ dependencies = [ [[package]] name = "pyth-sdk-solana" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c637b4d8e9558e596a13cd5a11e406431c80eed4fbed01e4b1ff474257b04db9" +checksum = "4bc0e0ab39d0543220dcba7c248161aab70e25916b2c1585057abc0856ff4e0c" dependencies = [ "borsh", "borsh-derive", diff --git a/Cargo.toml b/Cargo.toml index c6f4ebdf6..f0f23d172 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,8 @@ members = [ "metrics", "denylist", "iot_packet_verifier", - "iot_config" + "iot_config", + "price", ] [workspace.package] @@ -53,12 +54,12 @@ sqlx = {version = "0", features = [ ]} helium-crypto = {version = "0.6.8", features=["sqlx-postgres", "multisig"]} -helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]} +helium-proto = {git = "https://github.com/helium/proto", branch = "rg/price-oracle-grpc", features = ["services"]} hextree = "*" solana-client = "1.14" solana-sdk = "1.14" reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]} -beacon = {git = "https://github.com/helium/gateway-rs.git", branch = "main"} +beacon = {git = "https://github.com/helium/gateway-rs.git", branch = "rg/proto-test-branch"} humantime = "2" metrics = "0" metrics-exporter-prometheus = "0" diff --git a/file_store/src/cli/dump.rs b/file_store/src/cli/dump.rs index daeb4d29d..f01d85121 100644 --- a/file_store/src/cli/dump.rs +++ b/file_store/src/cli/dump.rs @@ -15,9 +15,10 @@ use helium_proto::{ CellHeartbeatIngestReportV1, CellHeartbeatReqV1, Heartbeat, RadioRewardShare, SpeedtestAvg, SpeedtestIngestReportV1, SpeedtestReqV1, }, + price_oracle::PriceOracleReportV1, router::PacketRouterPacketReportV1, }, - BlockchainTxn, Message, RewardManifest, SubnetworkRewards, + BlockchainTokenTypeV1, BlockchainTxn, Message, RewardManifest, SubnetworkRewards, }; use serde_json::json; use std::io; @@ -163,8 +164,14 @@ impl Cmd { let packet_report = PacketRouterPacketReportV1::decode(msg)?; print_json(&json!({ "oui": packet_report.oui, - "timestamp": packet_report.gateway_timestamp_ms, - + "timestamp": packet_report.gateway_timestamp_ms}))?; + } + FileType::PriceReport => { + let manifest = PriceOracleReportV1::decode(msg)?; + print_json(&json!({ + "price": manifest.price, + "timestamp": manifest.timestamp, + "token_type": BlockchainTokenTypeV1::from_i32(manifest.token_type), }))?; } _ => (), diff --git a/file_store/src/cli/info.rs b/file_store/src/cli/info.rs index e0bd361d5..e4fe5e389 100644 --- a/file_store/src/cli/info.rs +++ b/file_store/src/cli/info.rs @@ -7,8 +7,9 @@ use crate::{ use bytes::BytesMut; use chrono::{DateTime, Utc}; use futures::StreamExt; -use helium_proto::services::poc_lora::{ - LoraBeaconIngestReportV1, LoraPocV1, LoraWitnessIngestReportV1, +use helium_proto::services::{ + poc_lora::{LoraBeaconIngestReportV1, LoraPocV1, LoraWitnessIngestReportV1}, + price_oracle::PriceOracleReportV1, }; use helium_proto::{ services::poc_mobile::{ @@ -72,6 +73,12 @@ impl MsgTimestamp>> for EntropyReportV1 { } } +impl MsgTimestamp>> for PriceOracleReportV1 { + fn timestamp(&self) -> Result> { + self.timestamp.to_timestamp() + } +} + fn get_timestamp(file_type: &FileType, buf: &[u8]) -> Result> { let result = match file_type { FileType::CellHeartbeat => CellHeartbeatReqV1::decode(buf) @@ -115,6 +122,9 @@ fn get_timestamp(file_type: &FileType, buf: &[u8]) -> Result> { }) }) .and_then(|beacon_report| beacon_report.timestamp())?, + FileType::PriceReport => PriceOracleReportV1::decode(buf) + .map_err(Error::from) + .and_then(|entry| entry.timestamp())?, _ => Utc::now(), }; diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 840cec960..654eaf77d 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -111,6 +111,7 @@ pub const VALID_PACKET: &str = "valid_packet"; pub const INVALID_PACKET: &str = "invalid_packet"; pub const GATEWAY_REWARD_SHARE: &str = "gateway_reward_share"; pub const DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "data_transfer_session_ingest_report"; +pub const PRICE_REPORT: &str = "price_report"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -137,6 +138,7 @@ pub enum FileType { InvalidPacket, GatewayRewardShare, DataTransferSessionIngestReport, + PriceReport, } impl fmt::Display for FileType { @@ -164,6 +166,7 @@ impl fmt::Display for FileType { Self::InvalidPacket => INVALID_PACKET, Self::GatewayRewardShare => GATEWAY_REWARD_SHARE, Self::DataTransferSessionIngestReport => DATA_TRANSFER_SESSION_INGEST_REPORT, + Self::PriceReport => PRICE_REPORT, }; f.write_str(s) } @@ -194,6 +197,7 @@ impl FileType { Self::InvalidPacket => INVALID_PACKET, Self::GatewayRewardShare => GATEWAY_REWARD_SHARE, Self::DataTransferSessionIngestReport => DATA_TRANSFER_SESSION_INGEST_REPORT, + Self::PriceReport => PRICE_REPORT, } } } @@ -224,6 +228,7 @@ impl FromStr for FileType { INVALID_PACKET => Self::InvalidPacket, GATEWAY_REWARD_SHARE => Self::GatewayRewardShare, DATA_TRANSFER_SESSION_INGEST_REPORT => Self::DataTransferSessionIngestReport, + PRICE_REPORT => Self::PriceReport, _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index fab1f0274..18184c01e 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -141,7 +141,7 @@ impl FileSinkBuilder { } } -#[derive(Clone, Debug)] +#[derive(Debug, Clone)] pub struct FileSinkClient { sender: MessageSender, metric: &'static str, diff --git a/price/Cargo.toml b/price/Cargo.toml new file mode 100644 index 000000000..5540b783a --- /dev/null +++ b/price/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "price" +version = "0.1.0" +description = "Price Oracle for the Helium Network" +edition.workspace = true +authors.workspace = true +license.workspace = true + +[dependencies] +anyhow = {workspace = true} +config = {workspace = true} +clap = {workspace = true} +thiserror = {workspace = true} +serde = {workspace = true} +serde_json = {workspace = true} +tonic = {workspace = true} +futures = {workspace = true} +futures-util = {workspace = true} +prost = {workspace = true} +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +metrics = {workspace = true } +metrics-exporter-prometheus = { workspace = true } +tokio = { workspace = true } +chrono = { workspace = true } +helium-proto = { workspace = true } +file-store = { path = "../file_store" } +poc-metrics = { path = "../metrics" } +triggered = {workspace = true} +pyth-sdk-solana = "0.7.1" +solana-client = ">= 1.9, < 1.15" +solana-program = ">= 1.9, < 1.15" diff --git a/price/README.md b/price/README.md new file mode 100644 index 000000000..3551527c5 --- /dev/null +++ b/price/README.md @@ -0,0 +1,16 @@ +# Price Oracle Server + +The price oracle server serves up price data for helium token(s) acquired from +the [pyth.network](https://pyth.network). + +The supported tokens are: +- HNT +- HST +- MOBILE +- IOT + +The price oracle server: + +- Requests price for HNT token at a regular interval (60s) from pyth. +- Stores and uploads [price_report](TODO) to a bucket. +- TODO: Acquire price for MOBILE and IOT token(s) when available on pyth. diff --git a/price/docker-compose.yml b/price/docker-compose.yml new file mode 100644 index 000000000..c8dae29f6 --- /dev/null +++ b/price/docker-compose.yml @@ -0,0 +1,67 @@ +version: "2.4" +services: + minio: + image: minio/minio:latest + environment: + MINIO_ROOT_USER: oracleadmin + MINIO_ROOT_PASSWORD: oracleadmin + ports: + - "9000:9000" + - "9090:9090" + volumes: + - bucket-data:/data + command: server /data --console-address ":9090" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + minio-setup: + image: minio/mc:latest + depends_on: + - minio + environment: + MINIO_ROOT_USER: oracleadmin + MINIO_ROOT_PASSWORD: oracleadmin + MINIO_BUCKETS: > + price + ORACLE_ID: oraclesecretid + ORACLE_KEY: oraclesecretkey + entrypoint: + - /bin/bash + - -c + - | + cat > /bucket-policy.json < /dev/null 2>&1 ; then + echo "creating bucket $${bucket}" + /usr/bin/mc mb localminio/$${bucket} + fi + done + /usr/bin/mc admin policy add localminio fullaccess /bucket-policy.json + /usr/bin/mc admin user add localminio $${ORACLE_ID} $${ORACLE_KEY} + /usr/bin/mc admin policy set localminio fullaccess user=$${ORACLE_ID} + +volumes: + bucket-data: diff --git a/price/pkg/settings-template.toml b/price/pkg/settings-template.toml new file mode 100644 index 000000000..79991c326 --- /dev/null +++ b/price/pkg/settings-template.toml @@ -0,0 +1,56 @@ +# log settings for the application (RUST_LOG format). Default below +# +# log = "price=debug" + +# RPC Endpoint for price oracles. Required. +rpc_endpoint = "https://api.devnet.solana.com" + +# Price sink roll time (mins). Default = 3 mins. Optional. +sink_roll_mins = 3 + +# Price tick interval (secs). Default = 60s. Optional. +tick_interval = 60 + +# Price age (get price as long as it was updated within `age` seconds of current time) (in secs). Optional. +age = 60 + +[cluster] +name = "devnet" +hnt_price_key = "6Eg8YdfFJQF2HHonzPUBSCCmyUEhrStg9VBLK957sBe6" +# mobile_price_key = +# hst_price_key = +# iot_price_key = + +# Listen addresses for public api. Default below +# +# hnt_listen = "0.0.0.0:8080" +# mobile_listen = "0.0.0.0:8081" +# iot_listen = "0.0.0.0:8082" +# hst_listen = "0.0.0.0:8082" + +# Cache folder to use. Default blow +# +# cache = "/var/data/price" + +[output] +# Output bucket for price + +# Name of bucket to write details to. Required +# +bucket = "price" + +# Region for bucket. Defaults to below +# +# region = "us-west-2" + +# Optional URL for AWS api endpoint. Inferred from aws config settings or aws +# IAM context by default +# +# endpoint = "https://aws-s3-bucket.aws.com" + + +[metrics] + +# Endpoint for metrics. Default below +# +# endpoint = "127.0.0.1:19000" diff --git a/price/src/error.rs b/price/src/error.rs new file mode 100644 index 000000000..0f188e27f --- /dev/null +++ b/price/src/error.rs @@ -0,0 +1,9 @@ +#[derive(thiserror::Error, Debug)] +pub enum PriceError { + #[error("unsupported token type: {0}")] + UnsupportedTokenType(i32), + #[error("unable to fetch price")] + UnableToFetch, + #[error("unknown price account key, token_type: {0}")] + UnknownKey(String), +} diff --git a/price/src/lib.rs b/price/src/lib.rs new file mode 100644 index 000000000..3e45414bc --- /dev/null +++ b/price/src/lib.rs @@ -0,0 +1,12 @@ +pub mod error; +pub mod price_generator; +pub mod price_service; +pub mod settings; + +pub use error::PriceError; +pub use price_generator::PriceGenerator; +pub use settings::Settings; + +use tonic::{Response, Status}; + +pub type GrpcResult = Result, Status>; diff --git a/price/src/main.rs b/price/src/main.rs new file mode 100644 index 000000000..fc6346b22 --- /dev/null +++ b/price/src/main.rs @@ -0,0 +1,156 @@ +use anyhow::{Error, Result}; +use clap::Parser; +use file_store::{file_sink, file_upload, FileType}; +use futures_util::TryFutureExt; +use helium_proto::{services::price_oracle::Server as PriceServer, BlockchainTokenTypeV1}; +use price::{price_service::PriceService, PriceGenerator, Settings}; +use std::path; +use tokio::{self, signal}; +use tonic::transport; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +#[derive(Debug, clap::Parser)] +#[clap(version = env!("CARGO_PKG_VERSION"))] +#[clap(about = "Helium Price Oracle Server")] +pub struct Cli { + /// Optional configuration file to use. If present the toml file at the + /// given path will be loaded. Env variables can override the + /// settings in the given file. + #[clap(short = 'c')] + config: Option, + + #[clap(subcommand)] + cmd: Cmd, +} + +impl Cli { + pub async fn run(self) -> Result<()> { + let settings = Settings::new(self.config)?; + self.cmd.run(settings).await + } +} + +#[derive(Debug, clap::Subcommand)] +pub enum Cmd { + Server(Server), +} + +impl Cmd { + pub async fn run(&self, settings: Settings) -> Result<()> { + match self { + Self::Server(cmd) => cmd.run(&settings).await, + } + } +} + +#[derive(Debug, clap::Args)] +pub struct Server {} + +impl Server { + pub async fn run(&self, settings: &Settings) -> Result<()> { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new(&settings.log)) + .with(tracing_subscriber::fmt::layer()) + .init(); + + // Install the prometheus metrics exporter + poc_metrics::start_metrics(&settings.metrics)?; + + // configure shutdown trigger + let (shutdown_trigger, shutdown) = triggered::trigger(); + tokio::spawn(async move { + let _ = signal::ctrl_c().await; + shutdown_trigger.trigger() + }); + + // Initialize uploader + let (file_upload_tx, file_upload_rx) = file_upload::message_channel(); + let file_upload = + file_upload::FileUpload::from_settings(&settings.output, file_upload_rx).await?; + + let store_base_path = path::Path::new(&settings.cache); + + // price generators + let mut hnt_price_generator = + PriceGenerator::new(settings, BlockchainTokenTypeV1::Hnt).await?; + let mut mobile_price_generator = + PriceGenerator::new(settings, BlockchainTokenTypeV1::Mobile).await?; + let mut iot_price_generator = + PriceGenerator::new(settings, BlockchainTokenTypeV1::Iot).await?; + let mut hst_price_generator = + PriceGenerator::new(settings, BlockchainTokenTypeV1::Hst).await?; + + // price watchers + let hnt_watch = hnt_price_generator.receiver(); + let mobile_watch = mobile_price_generator.receiver(); + let iot_watch = iot_price_generator.receiver(); + let hst_watch = hst_price_generator.receiver(); + + // price service + let hnt_svc = PriceService::new(hnt_watch)?; + let mobile_svc = PriceService::new(mobile_watch)?; + let iot_svc = PriceService::new(iot_watch)?; + let hst_svc = PriceService::new(hst_watch)?; + + // price servers + let hnt_server = PriceServer::new(hnt_svc); + let mobile_server = PriceServer::new(mobile_svc); + let iot_server = PriceServer::new(iot_svc); + let hst_server = PriceServer::new(hst_svc); + + let (price_sink, mut price_sink_server) = file_sink::FileSinkBuilder::new( + FileType::PriceReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_report_submission"), + ) + .deposits(Some(file_upload_tx.clone())) + .roll_time(settings.sink_roll_time()) + .create() + .await?; + + let hnt_tonic_server = transport::Server::builder() + .add_service(hnt_server) + .serve_with_shutdown(settings.hnt_listen_addr()?, shutdown.clone()) + .map_err(Error::from); + let mobile_tonic_server = transport::Server::builder() + .add_service(mobile_server) + .serve_with_shutdown(settings.mobile_listen_addr()?, shutdown.clone()) + .map_err(Error::from); + let iot_tonic_server = transport::Server::builder() + .add_service(iot_server) + .serve_with_shutdown(settings.iot_listen_addr()?, shutdown.clone()) + .map_err(Error::from); + let hst_tonic_server = transport::Server::builder() + .add_service(hst_server) + .serve_with_shutdown(settings.hst_listen_addr()?, shutdown.clone()) + .map_err(Error::from); + + tokio::try_join!( + hnt_tonic_server, + mobile_tonic_server, + iot_tonic_server, + hst_tonic_server, + hnt_price_generator + .run(price_sink.clone(), &shutdown, BlockchainTokenTypeV1::Hnt) + .map_err(Error::from), + mobile_price_generator + .run(price_sink.clone(), &shutdown, BlockchainTokenTypeV1::Mobile) + .map_err(Error::from), + iot_price_generator + .run(price_sink.clone(), &shutdown, BlockchainTokenTypeV1::Iot) + .map_err(Error::from), + hst_price_generator + .run(price_sink, &shutdown, BlockchainTokenTypeV1::Hst) + .map_err(Error::from), + price_sink_server.run(&shutdown).map_err(Error::from), + file_upload.run(&shutdown).map_err(Error::from), + ) + .map(|_| ()) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + cli.run().await +} diff --git a/price/src/price_generator.rs b/price/src/price_generator.rs new file mode 100644 index 000000000..78135f97e --- /dev/null +++ b/price/src/price_generator.rs @@ -0,0 +1,181 @@ +use crate::{PriceError, Settings}; +use anyhow::Result; +use chrono::Utc; +use file_store::file_sink; +use helium_proto::{services::price_oracle::PriceOracleReportV1, BlockchainTokenTypeV1}; +use pyth_sdk_solana::load_price_feed_from_account; +use serde::Serialize; +use solana_client::rpc_client::RpcClient; +use solana_program::pubkey::Pubkey as SolPubkey; +use tokio::{sync::watch, time}; + +pub type MessageSender = watch::Sender; +pub type MessageReceiver = watch::Receiver; + +#[derive(Debug, Clone, Serialize)] +pub struct Price { + pub timestamp: i64, + pub price: u64, + pub token_type: BlockchainTokenTypeV1, +} + +impl Price { + pub fn new(timestamp: i64, price: u64, token_type: BlockchainTokenTypeV1) -> Self { + Self { + timestamp, + price, + token_type, + } + } +} + +pub struct PriceGenerator { + settings: Settings, + client: RpcClient, + pub receiver: MessageReceiver, + pub sender: MessageSender, +} + +impl From for PriceOracleReportV1 { + fn from(value: Price) -> Self { + Self { + timestamp: value.timestamp as u64, + price: value.price, + token_type: value.token_type.into(), + } + } +} + +impl TryFrom for Price { + type Error = PriceError; + + fn try_from(value: PriceOracleReportV1) -> Result { + let tt: BlockchainTokenTypeV1 = BlockchainTokenTypeV1::from_i32(value.token_type) + .ok_or(PriceError::UnsupportedTokenType(value.token_type))?; + Ok(Self { + timestamp: value.timestamp as i64, + price: value.price, + token_type: tt, + }) + } +} + +impl PriceGenerator { + pub async fn new(settings: &Settings, token_type: BlockchainTokenTypeV1) -> Result { + let client = RpcClient::new(&settings.rpc_endpoint); + let price = match settings.price_key(token_type) { + None => Price::new(0, 0, token_type), + Some(price_key) => get_price(&client, &price_key, settings.age, token_type).await?, + }; + let (sender, receiver) = watch::channel(price); + Ok(Self { + settings: settings.clone(), + client, + receiver, + sender, + }) + } + + pub fn receiver(&self) -> MessageReceiver { + self.receiver.clone() + } + + pub async fn run( + &mut self, + file_sink: file_sink::FileSinkClient, + shutdown: &triggered::Listener, + token_type: BlockchainTokenTypeV1, + ) -> anyhow::Result<()> { + tracing::info!("started price generator for: {:?}", token_type); + let mut price_timer = time::interval(self.settings.tick_interval().to_std()?); + price_timer.set_missed_tick_behavior(time::MissedTickBehavior::Delay); + + loop { + if shutdown.is_triggered() { + break; + } + tokio::select! { + _ = shutdown.clone() => break, + _ = price_timer.tick() => match self.handle_price_tick(&file_sink, token_type).await { + Ok(()) => (), + Err(err) => { + tracing::error!("fatal price generator error: {err:?}"); + return Err(err) + } + } + } + } + tracing::info!("stopping price generator"); + Ok(()) + } + + async fn handle_price_tick( + &mut self, + file_sink: &file_sink::FileSinkClient, + token_type: BlockchainTokenTypeV1, + ) -> anyhow::Result<()> { + if let Some(price_key) = &self.settings.price_key(token_type) { + let new_price = + match get_price(&self.client, price_key, self.settings.age, token_type).await { + Ok(price) => price.price, + Err(err) => { + tracing::warn!("failed to get price: {err:?}"); + self.receiver.borrow().price + } + }; + let timestamp = Utc::now().timestamp(); + + self.sender.send_modify(|price| { + price.price = new_price; + price.timestamp = timestamp; + }); + + let price = &*self.receiver.borrow(); + tracing::info!( + "updating price: {} for {:?}, at: {}", + price.price.to_string(), + token_type, + price.timestamp + ); + + let price_report = PriceOracleReportV1::from(price.clone()); + tracing::info!("price_report: {:?}", price_report); + + file_sink.write(price_report, []).await?; + + Ok(()) + } else { + tracing::warn!("no price key for {:?}", token_type); + Ok(()) + } + } +} + +pub async fn get_price( + client: &RpcClient, + price_key: &SolPubkey, + age: u64, + token_type: BlockchainTokenTypeV1, +) -> Result { + let mut price_account = client.get_account(price_key)?; + tracing::debug!("price_account: {:?}", price_account); + + let current_time = Utc::now().timestamp(); + let price_feed = load_price_feed_from_account(price_key, &mut price_account)?; + tracing::debug!("price_feed: {:?}", price_feed); + + match price_feed.get_price_no_older_than(current_time, age) { + None => { + tracing::error!("unable to fetch price at {:?}", current_time); + Err(PriceError::UnableToFetch.into()) + } + Some(p) => { + tracing::info!("got price {:?} at {:?}", p, current_time); + Ok(Price { + price: p.price as u64, + timestamp: current_time, + token_type, + }) + } + } +} diff --git a/price/src/price_service.rs b/price/src/price_service.rs new file mode 100644 index 000000000..9528f88a1 --- /dev/null +++ b/price/src/price_service.rs @@ -0,0 +1,42 @@ +use crate::{price_generator::MessageReceiver, GrpcResult}; +use helium_proto::{ + services::price_oracle::{PriceOracle, PriceOracleReportV1, PriceOracleReqV1}, + BlockchainTokenTypeV1, +}; +use tonic::{Request, Status}; + +pub struct PriceService { + price_watch: MessageReceiver, +} + +impl PriceService { + pub fn new(price_watch: MessageReceiver) -> anyhow::Result { + Ok(Self { price_watch }) + } +} + +#[tonic::async_trait] +impl PriceOracle for PriceService { + async fn price_oracle( + &self, + request: Request, + ) -> GrpcResult { + let price_request = request.into_inner(); + let request_tt = price_request.token_type; + + let price = &*self.price_watch.borrow(); + + match BlockchainTokenTypeV1::from_i32(request_tt) { + None => Err(Status::not_found("unknown token_type {request_tt}")), + Some(tt) => { + if price.token_type == tt { + let inner_resp: PriceOracleReportV1 = price.clone().into(); + metrics::increment_counter!("price_server_get_count"); + Ok(tonic::Response::new(inner_resp)) + } else { + Err(Status::invalid_argument("invalid token_type {request_tt}")) + } + } + } + } +} diff --git a/price/src/settings.rs b/price/src/settings.rs new file mode 100644 index 000000000..1c11a297a --- /dev/null +++ b/price/src/settings.rs @@ -0,0 +1,172 @@ +use anyhow::Result; +use chrono::Duration; +use config::{Config, Environment, File}; +use helium_proto::BlockchainTokenTypeV1; +use serde::Deserialize; +use solana_program::pubkey::Pubkey as SolPubkey; +use std::{ + net::{AddrParseError, SocketAddr}, + path::Path, + str::FromStr, +}; + +#[derive(Debug, Deserialize, Clone)] +pub struct ClusterConfig { + pub name: String, + pub hnt_price_key: Option, + pub mobile_price_key: Option, + pub iot_price_key: Option, + pub hst_price_key: Option, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct Settings { + /// RUST_LOG compatible settings string. Default to + /// "price=debug" + #[serde(default = "default_log")] + pub log: String, + /// Listen address for http requests for price. Default "0.0.0.0:8080" + #[serde(default = "default_hnt_listen_addr")] + pub hnt_listen: String, + #[serde(default = "default_mobile_listen_addr")] + pub mobile_listen: String, + #[serde(default = "default_iot_listen_addr")] + pub iot_listen: String, + #[serde(default = "default_hst_listen_addr")] + pub hst_listen: String, + /// Source URL for price data. Required + #[serde(default = "default_rpc_endpoint")] + pub rpc_endpoint: String, + /// Target output bucket details + pub output: file_store::Settings, + /// Folder for local cache of ingest data + #[serde(default = "default_cache")] + pub cache: String, + /// Metrics settings + pub metrics: poc_metrics::Settings, + /// Sink roll time (mins). Default = 3 mins. + #[serde(default = "default_sink_roll_mins")] + pub sink_roll_mins: i64, + /// Tick interval (secs). Default = 60s. + #[serde(default = "default_tick_interval")] + pub tick_interval: i64, + /// Price age (get price as long as it was updated within `age` seconds of current time) (in secs). + /// Default = 60s. + #[serde(default = "default_age")] + pub age: u64, + /// Cluster Configuration + pub cluster: ClusterConfig, +} + +pub fn default_rpc_endpoint() -> String { + "https://api.devnet.solana.com".to_string() +} + +pub fn default_log() -> String { + "price=debug".to_string() +} + +pub fn default_sink_roll_mins() -> i64 { + 3 +} + +pub fn default_tick_interval() -> i64 { + 60 +} + +pub fn default_age() -> u64 { + 60 +} + +pub fn default_cache() -> String { + "/var/data/price".to_string() +} + +pub fn default_hnt_listen_addr() -> String { + "0.0.0.0:8080".to_string() +} + +pub fn default_mobile_listen_addr() -> String { + "0.0.0.0:8081".to_string() +} + +pub fn default_iot_listen_addr() -> String { + "0.0.0.0:8082".to_string() +} + +pub fn default_hst_listen_addr() -> String { + "0.0.0.0:8083".to_string() +} + +impl Settings { + /// Load Settings from a given path. Settings are loaded from a given + /// optional path and can be overriden with environment variables. + /// + /// Environemnt overrides have the same name as the entries in the settings + /// file in uppercase and prefixed with "price_". For example + /// "price_LOG_" will override the log setting. + pub fn new>(path: Option

) -> Result { + let mut builder = Config::builder(); + + if let Some(file) = path { + // Add optional settings file + builder = builder + .add_source(File::with_name(&file.as_ref().to_string_lossy()).required(false)); + } + // Add in settings from the environment (with a prefix of APP) + // Eg.. `MI_DEBUG=1 ./target/app` would set the `debug` key + builder + .add_source(Environment::with_prefix("price").separator("_")) + .build() + .and_then(|config| config.try_deserialize()) + } + + pub fn hnt_listen_addr(&self) -> Result { + SocketAddr::from_str(&self.hnt_listen) + } + + pub fn mobile_listen_addr(&self) -> Result { + SocketAddr::from_str(&self.mobile_listen) + } + + pub fn iot_listen_addr(&self) -> Result { + SocketAddr::from_str(&self.iot_listen) + } + + pub fn hst_listen_addr(&self) -> Result { + SocketAddr::from_str(&self.hst_listen) + } + + pub fn sink_roll_time(&self) -> Duration { + Duration::minutes(self.sink_roll_mins) + } + + pub fn tick_interval(&self) -> Duration { + Duration::seconds(self.tick_interval) + } + + pub fn price_key(&self, token_type: BlockchainTokenTypeV1) -> Option { + match token_type { + BlockchainTokenTypeV1::Hnt => self + .cluster + .hnt_price_key + .as_ref() + .map(|key| SolPubkey::from_str(key).expect("unable to parse")), + BlockchainTokenTypeV1::Hst => self + .cluster + .hst_price_key + .as_ref() + .map(|key| SolPubkey::from_str(key).expect("unable to parse")), + BlockchainTokenTypeV1::Mobile => self + .cluster + .mobile_price_key + .as_ref() + .map(|key| SolPubkey::from_str(key).expect("unable to parse")), + BlockchainTokenTypeV1::Iot => self + .cluster + .iot_price_key + .as_ref() + .map(|key| SolPubkey::from_str(key).expect("unable to parse")), + } + } +}