Skip to content

Commit

Permalink
Try
Browse files Browse the repository at this point in the history
  • Loading branch information
kurotych committed Jan 21, 2025
1 parent 3a0491b commit 48dcfd4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 42 deletions.
9 changes: 4 additions & 5 deletions mobile_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,22 @@ use helium_proto::{
Message,
};
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use tokio::sync::RwLock;
use std::{collections::HashMap, sync::Arc};
use tonic::{Request, Response, Status};

pub struct GatewayService {
key_cache: KeyCache,
metadata_pool: Pool<Postgres>,
signing_key: Arc<Keypair>,
tracked_radios_cache: Arc<RwLock<TrackedRadiosMap>>,
tracked_radios_cache: TrackedRadiosMap,
}

impl GatewayService {
pub fn new(
key_cache: KeyCache,
metadata_pool: Pool<Postgres>,
signing_key: Keypair,
tracked_radios_cache: Arc<RwLock<TrackedRadiosMap>>,
tracked_radios_cache: TrackedRadiosMap,
) -> Self {
Self {
key_cache,
Expand Down Expand Up @@ -342,7 +341,7 @@ impl mobile_config::Gateway for GatewayService {

async fn handle_updated_at(
mut gateway_info: GatewayInfo,
updated_radios: &TrackedRadiosMap,
updated_radios: &HashMap<PublicKeyBinary, DateTime<Utc>>,
min_updated_at: chrono::DateTime<Utc>,
) -> Option<GatewayInfo> {
// Check mobile_radio_tracker HashMap
Expand Down
3 changes: 1 addition & 2 deletions mobile_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ impl Daemon {
let admin_svc =
AdminService::new(settings, key_cache.clone(), key_cache_updater, pool.clone())?;

let tracked_radios_cache: Arc<RwLock<TrackedRadiosMap>> =
Arc::new(RwLock::new(TrackedRadiosMap::new()));
let tracked_radios_cache = TrackedRadiosMap::new();

let gateway_svc = GatewayService::new(
key_cache.clone(),
Expand Down
85 changes: 50 additions & 35 deletions mobile_config/src/mobile_radio_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::UNIX_EPOCH;
use std::{collections::HashMap, sync::Arc, time::Duration};

use chrono::{DateTime, Utc};
Expand All @@ -7,9 +8,52 @@ use sqlx::Row;
use sqlx::{Pool, Postgres, QueryBuilder};
use std::str::FromStr;
use task_manager::ManagedTask;
use tokio::sync::RwLock;
use tokio::sync::{RwLock, RwLockReadGuard};

#[derive(Clone)]
pub struct TrackedRadiosMap(Arc<RwLock<HashMap<PublicKeyBinary, DateTime<Utc>>>>);

const GET_UPDATED_RADIOS: &str =
"SELECT entity_key, last_changed_at FROM mobile_radio_tracker WHERE last_changed_at >= $1";

pub async fn get_updated_radios(
pool: &Pool<Postgres>,
min_updated_at: DateTime<Utc>,
) -> anyhow::Result<HashMap<PublicKeyBinary, DateTime<Utc>>> {
sqlx::query(GET_UPDATED_RADIOS)
.bind(min_updated_at)
.fetch(pool)
.map_err(anyhow::Error::from)
.try_fold(
HashMap::new(),
|mut map: HashMap<PublicKeyBinary, DateTime<Utc>>, row| async move {
let entity_key_b = row.get::<&[u8], &str>("entity_key");
let entity_key = bs58::encode(entity_key_b).into_string();
let updated_at = row.get::<DateTime<Utc>, &str>("last_changed_at");
map.insert(PublicKeyBinary::from_str(&entity_key)?, updated_at);
Ok(map)
},
)
.await
}

impl TrackedRadiosMap {
pub fn new() -> Self {
TrackedRadiosMap(Arc::new(RwLock::new(HashMap::new())))
}

pub async fn update(&self, pool: &Pool<Postgres>) -> anyhow::Result<()> {
let new_data = get_updated_radios(pool, UNIX_EPOCH.into()).await?;
let mut write_guard = self.0.write().await;
*write_guard = new_data;
Ok(())
}

pub async fn read(&self) -> RwLockReadGuard<'_, HashMap<PublicKeyBinary, DateTime<Utc>>> {
self.0.read().await
}
}

pub type TrackedRadiosMap = HashMap<PublicKeyBinary, DateTime<Utc>>;
type EntityKey = Vec<u8>;

#[derive(Debug, Clone, sqlx::FromRow)]
Expand Down Expand Up @@ -110,7 +154,8 @@ pub struct MobileRadioTracker {
pool: Pool<Postgres>,
metadata: Pool<Postgres>,
interval: Duration,
tracked_radios_cache: Arc<RwLock<TrackedRadiosMap>>,
// tracked_radios_cache: Arc<RwLock<TrackedRadiosMap>>,
tracked_radios_cache: TrackedRadiosMap,
}

impl ManagedTask for MobileRadioTracker {
Expand All @@ -132,7 +177,7 @@ impl MobileRadioTracker {
pool: Pool<Postgres>,
metadata: Pool<Postgres>,
interval: Duration,
tracked_radios_cache: Arc<RwLock<TrackedRadiosMap>>,
tracked_radios_cache: TrackedRadiosMap,
) -> Self {
Self {
pool,
Expand Down Expand Up @@ -174,13 +219,7 @@ impl MobileRadioTracker {
update_tracked_radios(&self.pool, updates).await?;

tracing::info!("updating tracked radios cache");
let tracked_radios_map: TrackedRadiosMap =
get_updated_radios(&self.pool, DateTime::UNIX_EPOCH).await?;
{
let mut map = self.tracked_radios_cache.write().await;
*map = tracked_radios_map;
}

self.tracked_radios_cache.update(&self.pool).await?;
tracing::info!("done");
Ok(())
}
Expand All @@ -203,30 +242,6 @@ async fn identify_changes(
.await
}

const GET_UPDATED_RADIOS: &str =
"SELECT entity_key, last_changed_at FROM mobile_radio_tracker WHERE last_changed_at >= $1";

pub async fn get_updated_radios(
pool: &Pool<Postgres>,
min_updated_at: DateTime<Utc>,
) -> anyhow::Result<TrackedRadiosMap> {
sqlx::query(GET_UPDATED_RADIOS)
.bind(min_updated_at)
.fetch(pool)
.map_err(anyhow::Error::from)
.try_fold(
HashMap::new(),
|mut map: HashMap<PublicKeyBinary, DateTime<Utc>>, row| async move {
let entity_key_b = row.get::<&[u8], &str>("entity_key");
let entity_key = bs58::encode(entity_key_b).into_string();
let updated_at = row.get::<DateTime<Utc>, &str>("last_changed_at");
map.insert(PublicKeyBinary::from_str(&entity_key)?, updated_at);
Ok(map)
},
)
.await
}

pub async fn get_tracked_radios(
pool: &Pool<Postgres>,
) -> anyhow::Result<HashMap<EntityKey, TrackedMobileRadio>> {
Expand Down

0 comments on commit 48dcfd4

Please sign in to comment.