Skip to content

Commit

Permalink
Add info_v2 and info_batch_v2
Browse files Browse the repository at this point in the history
  • Loading branch information
kurotych committed Dec 11, 2024
1 parent c3f30d5 commit 4f187ce
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 7 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 81 additions & 2 deletions mobile_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use futures::{
use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign};
use helium_proto::{
services::mobile_config::{
self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoStreamReqV1,
GatewayInfoStreamReqV2, GatewayInfoStreamResV1, GatewayInfoStreamResV2,
self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoResV2,
GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV1,
GatewayInfoStreamResV2,
},
Message,
};
Expand Down Expand Up @@ -76,6 +77,7 @@ impl GatewayService {

#[tonic::async_trait]
impl mobile_config::Gateway for GatewayService {
// Deprecated
async fn info(&self, request: Request<GatewayInfoReqV1>) -> GrpcResult<GatewayInfoResV1> {
let request = request.into_inner();
telemetry::count_request("gateway", "info");
Expand Down Expand Up @@ -116,6 +118,47 @@ impl mobile_config::Gateway for GatewayService {
)
}

async fn info_v2(&self, request: Request<GatewayInfoReqV1>) -> GrpcResult<GatewayInfoResV2> {
let request = request.into_inner();
telemetry::count_request("gateway", "info");
custom_tracing::record_b58("pub_key", &request.address);
custom_tracing::record_b58("signer", &request.signer);

self.verify_request_signature_for_info(&request)?;

let pubkey: PublicKeyBinary = request.address.into();
tracing::debug!(pubkey = pubkey.to_string(), "fetching gateway info (v2)");

gateway_info::db::get_info(&self.metadata_pool, &pubkey)
.await
.map_err(|_| Status::internal("error fetching gateway info (v2)"))?
.map_or_else(
|| {
telemetry::count_gateway_chain_lookup("not-found");
Err(Status::not_found(pubkey.to_string()))
},
|info| {
if info.metadata.is_some() {
telemetry::count_gateway_chain_lookup("asserted");
} else {
telemetry::count_gateway_chain_lookup("not-asserted");
};
let info = info
.try_into()
.map_err(|_| Status::internal("error serializing gateway info (v2)"))?;
let mut res = GatewayInfoResV2 {
info: Some(info),
timestamp: Utc::now().encode_timestamp(),
signer: self.signing_key.public_key().into(),
signature: vec![],
};
res.signature = self.sign_response(&res.encode_to_vec())?;
Ok(Response::new(res))
},
)
}

// Deprecated
type info_batchStream = GrpcStreamResult<GatewayInfoStreamResV1>;
async fn info_batch(
&self,
Expand Down Expand Up @@ -152,6 +195,42 @@ impl mobile_config::Gateway for GatewayService {
Ok(Response::new(GrpcStreamResult::new(rx)))
}

type info_batch_v2Stream = GrpcStreamResult<GatewayInfoStreamResV2>;
async fn info_batch_v2(
&self,
request: Request<GatewayInfoBatchReqV1>,
) -> GrpcResult<Self::info_batch_v2Stream> {
let request = request.into_inner();
telemetry::count_request("gateway", "info-batch-v2");
custom_tracing::record_b58("signer", &request.signer);

let signer = verify_public_key(&request.signer)?;
self.verify_request_signature(&signer, &request)?;

tracing::debug!(
batch = request.addresses.len(),
"fetching gateways' info batch"
);

let pool = self.metadata_pool.clone();
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;
let addresses = request
.addresses
.into_iter()
.map(|key| key.into())
.collect::<Vec<PublicKeyBinary>>();

let (tx, rx) = tokio::sync::mpsc::channel(100);

tokio::spawn(async move {
let stream = gateway_info::db::batch_info_stream(&pool, &addresses)?;
stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Ok(Response::new(GrpcStreamResult::new(rx)))
}

// Deprecated
type info_streamStream = GrpcStreamResult<GatewayInfoStreamResV1>;
async fn info_stream(
Expand Down

0 comments on commit 4f187ce

Please sign in to comment.