Skip to content

Commit

Permalink
add server provider rewards support
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Dec 5, 2023
1 parent 2be9c1d commit 4cb076f
Show file tree
Hide file tree
Showing 16 changed files with 523 additions and 18 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ sqlx = {version = "0", features = [
"runtime-tokio-rustls"
]}
helium-crypto = {version = "0.8.1", features=["sqlx-postgres", "multisig"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "andymck/service-provider-rewards", features = ["services"]}
hextree = "*"
solana-client = "1.14"
solana-sdk = "1.14"
solana-program = "1.11"
spl-token = "3.5.0"
reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]}
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "andymck/service-provider-rewards" }
humantime = "2"
metrics = "0"
metrics-exporter-prometheus = "0"
Expand Down
4 changes: 4 additions & 0 deletions file_store/src/cli/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ impl Cmd {
"subscriber_id": reward.subscriber_id,
"discovery_location_amount": reward.discovery_location_amount,
}))?,
Some(Reward::ServiceProviderReward(reward)) => print_json(&json!({
"service_provider": reward.service_provider_id,
"amount": reward.amount,
}))?,
_ => (),
}
}
Expand Down
2 changes: 2 additions & 0 deletions file_store/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ impl_msg_verify!(mobile_config::AuthorizationListReqV1, signature);
impl_msg_verify!(mobile_config::AuthorizationListResV1, signature);
impl_msg_verify!(mobile_config::EntityVerifyReqV1, signature);
impl_msg_verify!(mobile_config::EntityVerifyResV1, signature);
impl_msg_verify!(mobile_config::CarrierKeyToEntityReqV1, signature);
impl_msg_verify!(mobile_config::CarrierKeyToEntityResV1, signature);
impl_msg_verify!(mobile_config::GatewayInfoReqV1, signature);
impl_msg_verify!(mobile_config::GatewayInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::GatewayInfoResV1, signature);
Expand Down
7 changes: 7 additions & 0 deletions mobile_config/migrations/5_carrier_service.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
create table carrier_keys (
pubkey text not null,
entity_key text not null,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
PRIMARY KEY(pubkey)
);
78 changes: 78 additions & 0 deletions mobile_config/src/carrier_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use crate::{key_cache::KeyCache, telemetry, verify_public_key, GrpcResult};
use chrono::Utc;
use file_store::traits::{MsgVerify, TimestampEncode};
use helium_crypto::{Keypair, PublicKey, Sign};
use helium_proto::{
services::mobile_config::{self, CarrierKeyToEntityReqV1, CarrierKeyToEntityResV1},
Message,
};
use sqlx::{Pool, Postgres};
use tonic::{Request, Response, Status};

pub struct CarrierService {
key_cache: KeyCache,
pool: Pool<Postgres>,
signing_key: Keypair,
}

impl CarrierService {
pub fn new(key_cache: KeyCache, pool: Pool<Postgres>, signing_key: Keypair) -> Self {
Self {
key_cache,
pool,
signing_key,
}
}

fn verify_request_signature<R>(&self, signer: &PublicKey, request: &R) -> Result<(), Status>
where
R: MsgVerify,
{
if self.key_cache.verify_signature(signer, request).is_ok() {
tracing::info!(signer = signer.to_string(), "request authorized");
return Ok(());
}
Err(Status::permission_denied("unauthorized request signature"))
}

fn sign_response(&self, response: &[u8]) -> Result<Vec<u8>, Status> {
self.signing_key
.sign(response)
.map_err(|_| Status::internal("response signing error"))
}

async fn key_to_entity(&self, pubkey: &String) -> Result<String, Status> {
let entity_key = sqlx::query_scalar::<_, String>(
" select entity_key from carrier_keys where pubkey = $1 ",
)
.bind(pubkey)
.fetch_one(&self.pool)
.await
.map_err(|_| Status::internal("carrier entity key not found"))?;
Ok(entity_key)
}
}

#[tonic::async_trait]
impl mobile_config::CarrierService for CarrierService {
async fn key_to_entity(
&self,
request: Request<CarrierKeyToEntityReqV1>,
) -> GrpcResult<CarrierKeyToEntityResV1> {
let request = request.into_inner();
telemetry::count_request("carrier_service", "key_to_entity");

let signer = verify_public_key(&request.signer)?;
self.verify_request_signature(&signer, &request)?;

let entity_key = self.key_to_entity(&request.pubkey).await?;
let mut response = CarrierKeyToEntityResV1 {
entity_key,
timestamp: Utc::now().encode_timestamp(),
signer: self.signing_key.public_key().into(),
signature: vec![],
};
response.signature = self.sign_response(&response.encode_to_vec())?;
Ok(Response::new(response))
}
}
76 changes: 76 additions & 0 deletions mobile_config/src/client/carrier_service_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use super::{call_with_retry, ClientError, Settings, CACHE_EVICTION_FREQUENCY};
use async_trait::async_trait;
use file_store::traits::MsgVerify;
use helium_crypto::{Keypair, PublicKey, Sign};
use helium_proto::{
services::{mobile_config, Channel},
Message,
};
use retainer::Cache;
use std::{sync::Arc, time::Duration};

#[async_trait]
pub trait CarrierServiceVerifier {
type Error;
async fn key_to_rewardable_entity<'a>(&self, entity_id: &'a str)
-> Result<String, Self::Error>;
}
#[derive(Clone)]
pub struct CarrierServiceClient {
client: mobile_config::CarrierServiceClient<Channel>,
signing_key: Arc<Keypair>,
config_pubkey: PublicKey,
cache: Arc<Cache<String, String>>,
cache_ttl: Duration,
}

#[async_trait]
impl CarrierServiceVerifier for CarrierServiceClient {
type Error = ClientError;

async fn key_to_rewardable_entity<'a>(&self, pubkey: &'a str) -> Result<String, ClientError> {
if let Some(carrier_found) = self.cache.get(&pubkey.to_string()).await {
return Ok(carrier_found.value().clone());
}

let mut request = mobile_config::CarrierKeyToEntityReqV1 {
pubkey: pubkey.to_string(),
signer: self.signing_key.public_key().into(),
signature: vec![],
};
request.signature = self.signing_key.sign(&request.encode_to_vec())?;
tracing::debug!(?pubkey, "getting entity key for carrier on-chain");
let response = match call_with_retry!(self.client.clone().key_to_entity(request.clone())) {
Ok(verify_res) => {
let response = verify_res.into_inner();
response.verify(&self.config_pubkey)?;
response.entity_key
}
Err(status) => Err(status)?,
};
self.cache
.insert(pubkey.to_string(), response.clone(), self.cache_ttl)
.await;
Ok(response)
}
}

impl CarrierServiceClient {
pub fn from_settings(settings: &Settings) -> Result<Self, Box<helium_crypto::Error>> {
let cache = Arc::new(Cache::new());
let cloned_cache = cache.clone();
tokio::spawn(async move {
cloned_cache
.monitor(4, 0.25, CACHE_EVICTION_FREQUENCY)
.await
});

Ok(Self {
client: settings.connect_carrier_service_client(),
signing_key: settings.signing_keypair()?,
config_pubkey: settings.config_pubkey()?,
cache_ttl: settings.cache_ttl(),
cache,
})
}
}
2 changes: 2 additions & 0 deletions mobile_config/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
pub mod authorization_client;
pub mod carrier_service_client;
pub mod entity_client;
pub mod gateway_client;
mod settings;

use std::time::Duration;

pub use authorization_client::AuthorizationClient;
pub use carrier_service_client::CarrierServiceClient;
pub use entity_client::EntityClient;
pub use gateway_client::GatewayClient;
pub use settings::Settings;
Expand Down
5 changes: 5 additions & 0 deletions mobile_config/src/client/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ impl Settings {
mobile_config::EntityClient::new(channel)
}

pub fn connect_carrier_service_client(&self) -> mobile_config::CarrierServiceClient<Channel> {
let channel = connect_channel(self);
mobile_config::CarrierServiceClient::new(channel)
}

pub fn signing_keypair(
&self,
) -> Result<Arc<helium_crypto::Keypair>, Box<helium_crypto::Error>> {
Expand Down
1 change: 1 addition & 0 deletions mobile_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tonic::{Response, Status};

pub mod admin_service;
pub mod authorization_service;
pub mod carrier_service;
pub mod client;
pub mod entity_service;
pub mod gateway_info;
Expand Down
11 changes: 8 additions & 3 deletions mobile_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use clap::Parser;
use futures::future::LocalBoxFuture;
use futures_util::TryFutureExt;
use helium_proto::services::mobile_config::{
AdminServer, AuthorizationServer, EntityServer, GatewayServer,
AdminServer, AuthorizationServer, CarrierServiceServer, EntityServer, GatewayServer,
};
use mobile_config::{
admin_service::AdminService, authorization_service::AuthorizationService,
entity_service::EntityService, gateway_service::GatewayService, key_cache::KeyCache,
settings::Settings,
carrier_service::CarrierService, entity_service::EntityService,
gateway_service::GatewayService, key_cache::KeyCache, settings::Settings,
};
use std::{net::SocketAddr, path::PathBuf, time::Duration};
use task_manager::{ManagedTask, TaskManager};
Expand Down Expand Up @@ -86,13 +86,16 @@ impl Daemon {
metadata_pool.clone(),
settings.signing_keypair()?,
);
let carrier_svc =
CarrierService::new(key_cache.clone(), pool.clone(), settings.signing_keypair()?);

let grpc_server = GrpcServer {
listen_addr,
admin_svc,
gateway_svc,
auth_svc,
entity_svc,
carrier_svc,
};

TaskManager::builder().add_task(grpc_server).start().await
Expand All @@ -105,6 +108,7 @@ pub struct GrpcServer {
gateway_svc: GatewayService,
auth_svc: AuthorizationService,
entity_svc: EntityService,
carrier_svc: CarrierService,
}

impl ManagedTask for GrpcServer {
Expand All @@ -121,6 +125,7 @@ impl ManagedTask for GrpcServer {
.add_service(GatewayServer::new(self.gateway_svc))
.add_service(AuthorizationServer::new(self.auth_svc))
.add_service(EntityServer::new(self.entity_svc))
.add_service(CarrierServiceServer::new(self.carrier_svc))
.serve_with_shutdown(self.listen_addr, shutdown)
.map_err(Error::from)
.await
Expand Down
6 changes: 5 additions & 1 deletion mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use file_store::{
FileType,
};
use futures_util::TryFutureExt;
use mobile_config::client::{AuthorizationClient, EntityClient, GatewayClient};
use mobile_config::client::{
AuthorizationClient, CarrierServiceClient, EntityClient, GatewayClient,
};
use price::PriceTracker;
use tokio::signal;

Expand Down Expand Up @@ -53,6 +55,7 @@ impl Cmd {
let gateway_client = GatewayClient::from_settings(&settings.config_client)?;
let auth_client = AuthorizationClient::from_settings(&settings.config_client)?;
let entity_client = EntityClient::from_settings(&settings.config_client)?;
let carrier_client = CarrierServiceClient::from_settings(&settings.config_client)?;

// price tracker
let (price_tracker, tracker_process) =
Expand Down Expand Up @@ -217,6 +220,7 @@ impl Cmd {

let rewarder = Rewarder::new(
pool.clone(),
carrier_client,
Duration::hours(reward_period_hours),
Duration::minutes(settings.reward_offset_minutes),
mobile_rewards,
Expand Down
Loading

0 comments on commit 4cb076f

Please sign in to comment.