Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hybrid network support #1606

Merged
merged 18 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 135 additions & 34 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ edition = "2018"

[features]
default = ['erc20-driver', 'zksync-driver', 'gftp/bin']
hybrid-net = ['ya-net/hybrid-net']
static-openssl = ["openssl/vendored", "openssl-probe"]
dummy-driver = ['ya-dummy-driver']
erc20-driver = ['ya-erc20-driver']
Expand Down
5 changes: 1 addition & 4 deletions core/identity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ appdirs = "0.2"
chrono = { version = "0.4", features = ["serde"] }
diesel = { version = "1.4", features = ["sqlite", "r2d2", "chrono"] }
diesel_migrations = "1.4"
# Lock ethsign on this commit to match parity-crypto=0.6 from zksync_eth_client
ethsign = { git = "https://github.com/tomusdrw/ethsign.git", rev = "1b93031d2c45f3a3313e5418876b3d3e751ba87e" }
# TODO: Upgrade to latest = 0.8 after zksync_eth_client is updated
# ethsign = "0.8"
ethsign = "0.8"
futures = "0.3"
log = "0.4"
promptly = "0.3.0"
Expand Down
9 changes: 8 additions & 1 deletion core/identity/src/id_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::convert::TryFrom;

use anyhow::Context;
use ethsign::keyfile::Bytes;
use ethsign::{KeyFile, Protected, SecretKey};
use ethsign::{KeyFile, Protected, PublicKey, SecretKey};
use rand::Rng;
use ya_client_model::NodeId;

Expand Down Expand Up @@ -33,6 +33,13 @@ impl IdentityKey {
std::mem::replace(&mut self.alias, new_alias)
}

pub fn to_pub_key(&self) -> Result<PublicKey, Error> {
match &self.secret {
Some(secret) => Ok(secret.public()),
None => Err(Error::internal("key locked")),
}
}

pub fn to_key_file(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(&self.key_file)
}
Expand Down
21 changes: 20 additions & 1 deletion core/identity/src/service/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;

use actix_rt::Arbiter;
use chrono::Utc;
use ethsign::{KeyFile, Protected};
use ethsign::{KeyFile, Protected, PublicKey};
use futures::lock::Mutex;
use futures::prelude::*;
use ya_client_model::NodeId;
Expand Down Expand Up @@ -367,6 +367,14 @@ impl IdentityService {
Ok(model::Ack {})
}

pub async fn get_pub_key(
&mut self,
key_id: model::GetPubKey,
) -> Result<PublicKey, model::Error> {
let key = self.get_key_by_id(&key_id.0)?;
key.to_pub_key().map_err(|e| model::Error::new_err_msg(e))
}

pub async fn get_key_file(
&mut self,
key_id: model::GetKeyFile,
Expand Down Expand Up @@ -479,6 +487,17 @@ impl IdentityService {
async move { this.lock().await.subscribe(subscribe).await }
});
let this = me.clone();
let _ = bus::bind(model::BUS_ID, move |node_id: model::GetPubKey| {
let this = this.clone();
async move {
this.lock()
.await
.get_pub_key(node_id)
.await
.map(|key| key.bytes().to_vec())
}
});
let this = me.clone();
let _ = bus::bind(model::BUS_ID, move |node_id: model::GetKeyFile| {
let this = this.clone();
async move { this.lock().await.get_key_file(node_id).await }
Expand Down
19 changes: 5 additions & 14 deletions core/market/src/protocol/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use tokio::time::delay_for;

use ya_client::model::NodeId;
use ya_core_model::market::BUS_ID;
use ya_core_model::net::local as local_net;
use ya_core_model::net::local::{BroadcastMessage, SendBroadcastMessage};
use ya_net::{self as net, RemoteEndpoint};
use ya_service_bus::timeout::{IntoDuration, IntoTimeoutFuture};
use ya_service_bus::typed::ServiceBinder;
use ya_service_bus::{typed as bus, Error as BusError, RpcEndpoint, RpcMessage};
use ya_service_bus::{Error as BusError, RpcEndpoint, RpcMessage};

use super::callback::HandlerSlot;
use crate::config::DiscoveryConfig;
Expand Down Expand Up @@ -110,16 +109,12 @@ impl Discovery {
};
let size = offer_ids.len();
log::debug!("Broadcasting offers. count={}", size);
let bcast_msg = SendBroadcastMessage::new(OffersBcast { offer_ids });

counter!("market.offers.broadcasts.net", 1);
value!("market.offers.broadcasts.len", size as u64);

// TODO: We shouldn't use send_as. Put identity inside broadcasted message instead.
if let Err(e) = bus::service(local_net::BUS_ID)
.send_as(default_id, bcast_msg) // TODO: should we send as our (default) identity?
.await
{
// TODO: should we send as our (default) identity?
if let Err(e) = net::broadcast(default_id, OffersBcast { offer_ids }).await {
mfranciszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
log::error!("Error sending bcast, skipping... error={:?}", e);
counter!("market.offers.broadcasts.net_errors", 1);
};
Expand Down Expand Up @@ -209,15 +204,11 @@ impl Discovery {

let size = offer_ids.len();
log::debug!("Broadcasting unsubscribes. count={}", size);
let bcast_msg = SendBroadcastMessage::new(UnsubscribedOffersBcast { offer_ids });
counter!("market.offers.unsubscribes.broadcasts.net", 1);
value!("market.offers.unsubscribes.broadcasts.len", size as u64);

// TODO: We shouldn't use send_as. Put identity inside broadcasted message instead.
if let Err(e) = bus::service(local_net::BUS_ID)
.send_as(default_id, bcast_msg) // TODO: should we send as our (default) identity?
.await
{
// TODO: should we send as our (default) identity?
if let Err(e) = net::broadcast(default_id, UnsubscribedOffersBcast { offer_ids }).await {
log::error!("Error sending bcast, skipping... error={:?}", e);
counter!("market.offers.unsubscribes.broadcasts.net_errors", 1);
};
Expand Down
10 changes: 10 additions & 0 deletions core/model/src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ impl RpcMessage for Subscribe {
type Error = Error;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetPubKey(pub NodeId);

impl RpcMessage for GetPubKey {
const ID: &'static str = "GetPubKey";
type Item = Vec<u8>;
type Error = Error;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetKeyFile(pub NodeId);
Expand Down
11 changes: 11 additions & 0 deletions core/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ edition = "2018"
[features]
default = []
service = []
hybrid-net = ["ya-net-server", "ya-relay-proto", "ya-sb-proto", "bytes", "ethsign", "tokio-util", "url", "prost", "rand"]

[dependencies]
ya-core-model = { version = "^0.4", features=["net", "identity"] }
ya-net-server = { git = "https://github.com/golemfactory/ya-relay.git", branch = "mf/ya-integration", optional = true }
ya-relay-proto = { git = "https://github.com/golemfactory/ya-relay.git", branch = "mf/ya-integration", features = ["codec"], optional = true }
ya-sb-proto = { version = "0.4", optional = true }
ya-service-api = "0.1"
ya-service-api-interfaces = "0.1"
ya-service-bus = "0.4"
Expand All @@ -25,6 +29,13 @@ serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "0.2", features = ["time"] }

bytes = { version = "0.5", optional = true }
ethsign = { version = "0.8", optional = true }
tokio-util = { version = "0.3", optional = true }
url = { version = "2.2", optional = true }
prost = { version = "0.6", optional = true }
rand = { version = "0.7", optional = true}

[dev-dependencies]
ya-sb-proto = "0.4"
ya-sb-router = "0.4"
Expand Down
105 changes: 0 additions & 105 deletions core/net/src/api.rs

This file was deleted.

13 changes: 6 additions & 7 deletions core/net/src/bcast.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
// Broadcast support service

use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use ya_core_model::net::local as local_net;

#[derive(Clone, Default)]
pub struct BCastService {
inner: Rc<RefCell<BCastServiceInner>>,
inner: Arc<Mutex<BCastServiceInner>>,
mfranciszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Default)]
struct BCastServiceInner {
last_id: u64,
topics: BTreeMap<String, Vec<(u64, Rc<str>)>>,
topics: BTreeMap<String, Vec<(u64, Arc<str>)>>,
}

impl BCastService {
pub fn add(&self, subscribe: local_net::Subscribe) -> (bool, u64) {
let mut me = self.inner.borrow_mut();
let mut me = self.inner.lock().unwrap();
let id = me.last_id;
let receivers = me
.topics
Expand All @@ -31,8 +30,8 @@ impl BCastService {
(is_new, id)
}

pub fn resolve(&self, topic: &str) -> Vec<Rc<str>> {
let me = self.inner.borrow();
pub fn resolve(&self, topic: &str) -> Vec<Arc<str>> {
let me = self.inner.lock().unwrap();
me.topics
.get(topic)
.map(|receivers| {
Expand Down
67 changes: 67 additions & 0 deletions core/net/src/central/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use ya_core_model::net;
use ya_core_model::net::local::ToEndpoint;
use ya_core_model::net::local::{BindBroadcastError, BroadcastMessage, SendBroadcastMessage};
use ya_service_bus::{typed as bus, RpcEndpoint, RpcMessage};

use crate::central::SUBSCRIPTIONS;

pub async fn broadcast<M, S>(
caller: S,
message: M,
) -> Result<
Result<
<SendBroadcastMessage<M> as RpcMessage>::Item,
<SendBroadcastMessage<M> as RpcMessage>::Error,
>,
ya_service_bus::Error,
>
where
M: BroadcastMessage + Send + Sync + Unpin + 'static,
S: ToString + 'static,
{
// TODO: We shouldn't use send_as. Put identity inside broadcasted message instead.
bus::service(net::local::BUS_ID)
.send_as(caller, SendBroadcastMessage::new(message))
.await
}

pub async fn bind_broadcast_with_caller<M, T, F>(
broadcast_address: &str,
handler: F,
) -> Result<(), BindBroadcastError>
where
M: BroadcastMessage + Send + Sync + 'static,
T: std::future::Future<
Output = Result<
<SendBroadcastMessage<M> as RpcMessage>::Item,
<SendBroadcastMessage<M> as RpcMessage>::Error,
>,
> + 'static,
F: FnMut(String, SendBroadcastMessage<M>) -> T + 'static,
{
log::debug!("Creating broadcast topic {}.", M::TOPIC);

// We send Subscribe message to local net, which will create Topic
// and add broadcast_address to be endpoint, which will be called, when someone
// will broadcast any Message related to this Topic.
let subscribe_msg = M::into_subscribe_msg(broadcast_address);
{
let mut subscriptions = SUBSCRIPTIONS.lock().unwrap();
subscriptions.insert(subscribe_msg.clone());
}

bus::service(net::local::BUS_ID)
.send(subscribe_msg)
.await??;

log::debug!(
"Binding handler '{}' for broadcast topic {}.",
broadcast_address,
M::TOPIC
);

// We created endpoint address above. Now we must add handler, which will
// handle broadcasts forwarded to this address.
bus::bind_with_caller(broadcast_address, handler);
Ok(())
}
Loading