Skip to content

Commit

Permalink
feat: introduce DealId type (#2124)
Browse files Browse the repository at this point in the history
* feat: introduce DealId type

* fix clippy

* remove unused import
  • Loading branch information
justprosh authored Feb 27, 2024
1 parent a4485ab commit ed0984d
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ fs-utils = { workspace = true }
peer-metrics = { workspace = true }
particle-execution = { workspace = true }
workers = { workspace = true }
types = { workspace = true }

avm-server = { workspace = true }
libp2p = { workspace = true }
Expand Down
9 changes: 5 additions & 4 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use fluence_keypair::KeyPair;
use fluence_libp2p::PeerId;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::{ExtendedParticle, Particle};
use types::DealId;

struct Reusables<RT> {
vm_id: usize,
Expand Down Expand Up @@ -68,7 +69,7 @@ pub struct Actor<RT, F> {
key_pair: KeyPair,
data_store: Arc<ParticleDataStore>,
spawner: Spawner,
deal_id: Option<String>,
deal_id: Option<DealId>,
}

impl<RT, F> Actor<RT, F>
Expand All @@ -85,7 +86,7 @@ where
particle_token: String,
key_pair: KeyPair,
data_store: Arc<ParticleDataStore>,
deal_id: Option<String>,
deal_id: Option<DealId>,
spawner: Spawner,
) -> Self {
Self {
Expand Down Expand Up @@ -161,7 +162,7 @@ where
parent: parent_span.as_ref(),
"Actor::poll_avm_future::future_ready",
particle_id= self.particle.id,
deal_id = self.deal_id
deal_id = self.deal_id.as_ref().map(String::from)
);
let _span_guard = span.enter();

Expand Down Expand Up @@ -281,7 +282,7 @@ where
let async_span = tracing::info_span!(
"Actor: async AVM process particle & call results",
particle_id = particle_id,
deal_id = self.deal_id
deal_id = self.deal_id.as_ref().map(String::from)
);
if let Some(ext_particle) = ext_particle.as_ref() {
async_span.follows_from(ext_particle.span.as_ref());
Expand Down
118 changes: 118 additions & 0 deletions crates/nox-tests/tests/workers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use connected_client::ConnectedClient;
use created_swarm::make_swarms;
use eyre::Context;
use hex::FromHex;
use maplit::hashmap;
use serde_json::json;
use workers::CUID;

async fn create_worker(client: &mut ConnectedClient, deal_id: &str) -> String {
let init_id_1 =
<CUID>::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea")
.unwrap();
let unit_ids = vec![init_id_1];
let data = hashmap! {
"deal_id" => json!(deal_id.to_string()),
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
"cu_ids" => json!(unit_ids)
};

let response = client
.execute_particle(
r#"
(seq
(call relay ("worker" "create") [deal_id cu_ids] worker_peer_id)
(call client ("return" "") [worker_peer_id])
)"#,
data.clone(),
)
.await
.unwrap();

let worker_id = response[0].as_str().unwrap().to_string();
assert_ne!(worker_id.len(), 0);

worker_id
}

async fn get_worker_id(client: &mut ConnectedClient, deal_id: &str) -> String {
let data = hashmap! {
"deal_id" => json!(deal_id.to_string()),
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string())
};

let response = client
.execute_particle(
r#"
(seq
(call relay ("worker" "get_worker_id") [deal_id] get_worker_peer_id)
(seq
(ap get_worker_peer_id.$.[0] worker_peer_id)
(call client ("return" "") [worker_peer_id])
)
)"#,
data.clone(),
)
.await
.unwrap();

let worker_id = response[0].as_str().unwrap().to_string();
assert_ne!(worker_id.len(), 0);

worker_id
}

async fn is_worker_active(client: &mut ConnectedClient, deal_id: &str) -> bool {
let data = hashmap! {
"deal_id" => json!(deal_id.to_string()),
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string())
};

let response = client
.execute_particle(
r#"
(seq
(call relay ("worker" "is_active") [deal_id] is_active)
(call client ("return" "") [is_active])
)"#,
data.clone(),
)
.await
.unwrap();

response[0].as_bool().unwrap()
}

#[tokio::test]
async fn test_worker_different_deal_ids() {
let swarms = make_swarms(1).await;
let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.await
.wrap_err("connect client")
.unwrap();

let deal_id_mixed_prefix = "0x1234aBcD";
let deal_id_mixed = "1234aBcD";
let deal_id_lowercase_prefix = "0x1234abcd";
let deal_id_lowercase = "1234abcd";

let worker_id_1 = create_worker(&mut client, deal_id_mixed_prefix).await;

let worker_id_2 = get_worker_id(&mut client, deal_id_mixed).await;
let worker_id_3 = get_worker_id(&mut client, deal_id_lowercase_prefix).await;
let worker_id_4 = get_worker_id(&mut client, deal_id_lowercase).await;
let worker_id_5 = get_worker_id(&mut client, deal_id_mixed_prefix).await;

assert_eq!(worker_id_1, worker_id_2);
assert_eq!(worker_id_1, worker_id_3);
assert_eq!(worker_id_1, worker_id_4);
assert_eq!(worker_id_1, worker_id_5);

assert!(is_worker_active(&mut client, deal_id_lowercase).await);
assert!(is_worker_active(&mut client, deal_id_mixed).await);
assert!(is_worker_active(&mut client, deal_id_lowercase_prefix).await);
assert!(is_worker_active(&mut client, deal_id_mixed_prefix).await);
}
2 changes: 1 addition & 1 deletion crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"


[dependencies]
libp2p-identity = { workspace = true, features = ["peerid"] }
libp2p-identity = { workspace = true, features = ["peerid", "ed25519", "rand"] }
serde = { workspace = true, features = ["derive"] }


Expand Down
124 changes: 124 additions & 0 deletions crates/types/src/deal_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::borrow::{Borrow, Cow};
use std::fmt::Display;

#[derive(Eq, Clone, Debug, Hash, PartialEq)]
pub struct DealId(String);

impl DealId {
pub fn normalize(str: &str) -> String {
str.trim_start_matches("0x").to_ascii_lowercase()
}

pub fn get_contract_address(&self) -> String {
format!("0x{}", self.0)
}
}

impl<'de> Deserialize<'de> for DealId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = <Cow<'de, str>>::deserialize(deserializer)?;
Ok(DealId::from(s.borrow()))
}
}

impl Serialize for DealId {
fn serialize<S>(&self, s: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
where
S: Serializer,
{
self.0.to_string().serialize(s)
}
}

impl PartialEq<&str> for DealId {
fn eq(&self, other: &&str) -> bool {
self.0 == DealId::normalize(other)
}
}

impl PartialEq<&str> for &DealId {
fn eq(&self, other: &&str) -> bool {
self.0 == DealId::normalize(other)
}
}

impl PartialEq<String> for DealId {
fn eq(&self, other: &String) -> bool {
self.0 == DealId::normalize(other)
}
}

impl PartialEq<String> for &DealId {
fn eq(&self, other: &String) -> bool {
self.0 == DealId::normalize(other)
}
}

impl From<DealId> for String {
fn from(deal_id: DealId) -> Self {
deal_id.0
}
}

impl From<String> for DealId {
fn from(deal_id: String) -> Self {
DealId(Self::normalize(&deal_id))
}
}

impl From<&DealId> for String {
fn from(deal_id: &DealId) -> Self {
deal_id.0.clone()
}
}

impl From<&str> for DealId {
fn from(deal_id: &str) -> Self {
DealId(Self::normalize(deal_id))
}
}

impl Display for DealId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

#[cfg(test)]
mod tests {
use crate::deal_id::DealId;

#[test]
fn deal_id() {
let deal_id_prefix_lowercase = "0x1234567890abcdef";
let deal_id_prefix_uppercase = "0x1234567890ABCDEF";
let deal_id_no_prefix_lowercase = "1234567890abcdef";
let deal_id_no_prefix_uppercase = "1234567890ABCDEF";
let deal_id_prefix_mixed_case = "0x1234567890AbCdEf";
let deal_id_no_prefix_mixed_case = "1234567890AbCdEf";

let deals = vec![
deal_id_prefix_lowercase,
deal_id_prefix_uppercase,
deal_id_no_prefix_lowercase,
deal_id_no_prefix_uppercase,
deal_id_prefix_mixed_case,
deal_id_no_prefix_mixed_case,
];

let deals = deals.into_iter().map(DealId::from).collect::<Vec<_>>();

assert!(deals.iter().all(|deal| deal == deal_id_prefix_lowercase));
assert!(deals.iter().all(|deal| deal == deal_id_prefix_uppercase));
assert!(deals.iter().all(|deal| deal == deal_id_no_prefix_lowercase));
assert!(deals.iter().all(|deal| deal == deal_id_no_prefix_uppercase));
assert!(deals.iter().all(|deal| deal == deal_id_prefix_mixed_case));
assert!(deals
.iter()
.all(|deal| deal == deal_id_no_prefix_mixed_case));
}
}
3 changes: 3 additions & 0 deletions crates/types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
mod deal_id;
pub mod peer_id;
pub mod peer_scope;

pub use deal_id::DealId;
5 changes: 3 additions & 2 deletions crates/workers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use libp2p::PeerId;
use std::path::PathBuf;
use thiserror::Error;
use types::peer_scope::WorkerId;
use types::DealId;

#[derive(Debug, Error)]
pub enum KeyStorageError {
Expand Down Expand Up @@ -114,9 +115,9 @@ pub enum WorkersError {
err: toml::de::Error,
},
#[error("Worker for {deal_id} already exists")]
WorkerAlreadyExists { deal_id: String },
WorkerAlreadyExists { deal_id: DealId },
#[error("Worker for deal_id {0} not found")]
WorkerNotFoundByDeal(String),
WorkerNotFoundByDeal(DealId),
#[error("Worker {0} not found")]
WorkerNotFound(WorkerId),
#[error("Error serializing persisted worker: {err}")]
Expand Down
2 changes: 0 additions & 2 deletions crates/workers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#![feature(try_blocks)]

pub type DealId = String;

mod error;
mod key_storage;
mod persistence;
Expand Down
2 changes: 1 addition & 1 deletion crates/workers/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct PersistedWorker {
impl From<PersistedWorker> for WorkerInfo {
fn from(val: PersistedWorker) -> Self {
WorkerInfo {
deal_id: val.deal_id,
deal_id: val.deal_id.into(),
creator: val.creator,
active: RwLock::new(val.active),
cu_ids: val.cu_ids,
Expand Down
Loading

0 comments on commit ed0984d

Please sign in to comment.