Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Choose net implementation (Hybrid/Central) using ENV variable #1666

Merged
merged 11 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ edition = "2018"

[features]
default = ['erc20-driver', 'zksync-driver', 'gftp/bin']
hybrid-net = ['ya-net/hybrid-net']
static-openssl = ["openssl/vendored", "openssl-probe"]
dummy-driver = ['ya-dummy-driver']
erc20-driver = ['ya-erc20-driver']
zksync-driver = ['ya-zksync-driver']
tos = []
# Temporary to make goth integration tests work
hybrid-net = ['ya-net/hybrid-net']

[[bin]]
name = "yagna"
Expand Down
24 changes: 14 additions & 10 deletions core/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ edition = "2018"
[features]
default = []
service = []
hybrid-net = ["ya-relay-client", "ya-relay-proto", "ya-sb-proto", "bytes", "ethsign", "tokio-util", "url", "prost", "rand"]
# Temporary to make goth integration tests work
hybrid-net = []

[dependencies]
ya-core-model = { version = "^0.4", features=["net", "identity"] }
ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "1052108b786c75c996f9c6db8bc52d8ddab04c45", optional = true }
ya-relay-proto = { git = "https://github.com/golemfactory/ya-relay.git", rev = "1052108b786c75c996f9c6db8bc52d8ddab04c45", features = ["codec"], optional = true }
ya-sb-proto = { version = "0.4", optional = true }
ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "1052108b786c75c996f9c6db8bc52d8ddab04c45" }
ya-relay-proto = { git = "https://github.com/golemfactory/ya-relay.git", rev = "1052108b786c75c996f9c6db8bc52d8ddab04c45", features = ["codec"] }
ya-sb-proto = { version = "0.4" }
ya-service-api = "0.1"
ya-service-api-interfaces = "0.1"
ya-service-bus = "0.4"
Expand All @@ -22,19 +23,22 @@ ya-utils-networking = "0.1"
actix-rt = "1.0"
anyhow = "1.0"
futures = "0.3"
humantime = "2.1"
lazy_static = "1.4"
log = "0.4"
metrics="0.12"
serde_json = "1.0"
structopt = "0.3"
strum = { version = "0.22", features = ["derive"] }
thiserror = "1.0"
tokio = { version = "0.2", features = ["time"] }

bytes = { version = "0.5", optional = true }
ethsign = { version = "0.8", optional = true }
tokio-util = { version = "0.3", optional = true }
url = { version = "2.2", optional = true }
prost = { version = "0.6", optional = true }
rand = { version = "0.7", optional = true}
bytes = { version = "0.5" }
ethsign = { version = "0.8" }
tokio-util = { version = "0.3" }
url = { version = "2.2" }
prost = { version = "0.6" }
rand = { version = "0.7"}

[dev-dependencies]
ya-sb-proto = "0.4"
Expand Down
1 change: 1 addition & 0 deletions core/net/src/central/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use service::{bind_remote, Net};

use std::collections::HashSet;
use std::sync::{Arc, Mutex};

use ya_core_model::net::local::Subscribe;

lazy_static::lazy_static! {
Expand Down
3 changes: 2 additions & 1 deletion core/net/src/central/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use ya_utils_networking::resolver;
use crate::bcast::BCastService;
use crate::central::handler::CentralBusHandler;
use crate::central::SUBSCRIPTIONS;
use crate::config::Config;

const CENTRAL_ADDR_ENV_VAR: &str = "CENTRAL_NET_HOST";

Expand Down Expand Up @@ -387,7 +388,7 @@ impl Default for ReconnectContext {
pub struct Net;

impl Net {
pub async fn gsb<Context>(_: Context) -> anyhow::Result<()> {
pub async fn gsb<Context>(_: Context, _config: Config) -> anyhow::Result<()> {
let (default_id, ids) = crate::service::identities().await?;
log::info!("using default identity as network id: {:?}", default_id);

Expand Down
48 changes: 48 additions & 0 deletions core/net/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::time::Duration;
use structopt::StructOpt;
use strum::VariantNames;
use strum::{EnumString, EnumVariantNames, IntoStaticStr};

#[derive(StructOpt, EnumString, EnumVariantNames, IntoStaticStr, Clone)]
#[strum(serialize_all = "lowercase")]
pub enum NetType {
Central,
Hybrid,
}

#[derive(StructOpt, Clone)]
#[structopt(rename_all = "kebab-case")]
pub struct Config {
#[structopt(env = "YA_NET_TYPE", possible_values = NetType::VARIANTS, default_value = NetType::Central.into())]
pub net_type: NetType,
#[structopt(env = "YA_NET_DEFAULT_PING_INTERVAL", parse(try_from_str = humantime::parse_duration), default_value = "15s")]
pub ping_interval: Duration,
#[structopt(env = "YA_NET_RELAY_HOST", default_value = "127.0.0.1:7464")]
pub host: String,
}

impl Config {
pub fn from_env() -> Result<Config, structopt::clap::Error> {
// Empty command line arguments, because we want to use ENV fallback
// or default values if ENV variables are not set.
Ok(Config::from_iter_safe(&[""])?)
}
}

/// TODO: Remove compilation flag.
/// This conditional compilation is hack to make Goth integration tests work.
/// Current solution in Goth is to build separate binary with compilation flag.
/// This is only temporary for transition period, to make this PR as small as possible.
#[cfg(not(feature = "hybrid-net"))]
impl Default for NetType {
fn default() -> Self {
NetType::Central
}
}

#[cfg(feature = "hybrid-net")]
impl Default for NetType {
fn default() -> Self {
NetType::Hybrid
}
}
27 changes: 15 additions & 12 deletions core/net/src/hybrid/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use anyhow::Context as AnyhowContext;
use futures::channel::mpsc;
use futures::stream::LocalBoxStream;
use futures::{FutureExt, SinkExt, Stream, StreamExt, TryStreamExt};
use tokio::time::{self, Duration};
use tokio::time::{self};
use url::Url;

use ya_core_model::net::{self, net_service};
Expand All @@ -26,13 +26,12 @@ use ya_service_bus::{untyped as local_bus, Error, ResponseChunk};
use ya_utils_networking::resolver;

use crate::bcast::BCastService;
use crate::config::Config;
use crate::hybrid::codec;
use crate::hybrid::crypto::IdentityCryptoProvider;

const NET_RELAY_HOST_ENV_VAR: &str = "NET_RELAY_HOST";
const DEFAULT_NET_RELAY_HOST: &str = "127.0.0.1:7464";
const DEFAULT_BROADCAST_NODE_COUNT: u32 = 12;
const DEFAULT_PING_INTERVAL: Duration = Duration::from_millis(15000);

pub type BCastHandler = Box<dyn FnMut(String, &[u8]) + Send>;

Expand All @@ -55,8 +54,8 @@ thread_local! {
static CLIENT: RefCell<Option<Client>> = Default::default();
}

async fn relay_addr() -> std::io::Result<SocketAddr> {
Ok(match std::env::var(NET_RELAY_HOST_ENV_VAR) {
async fn relay_addr(config: &Config) -> std::io::Result<SocketAddr> {
Ok(match std::env::var(&config.host) {
Ok(val) => val,
Err(_) => resolver::resolve_yagna_srv_record("_net_relay._udp")
.await
Expand All @@ -71,21 +70,25 @@ async fn relay_addr() -> std::io::Result<SocketAddr> {
pub struct Net;

impl Net {
pub async fn gsb<Context>(_: Context) -> anyhow::Result<()> {
pub async fn gsb<Context>(_: Context, config: Config) -> anyhow::Result<()> {
let (default_id, ids) = crate::service::identities().await?;
start_network(default_id, ids).await?;
start_network(config, default_id, ids).await?;
Ok(())
}
}

// FIXME: examples compatibility
#[allow(unused)]
pub async fn bind_remote<T>(_: T, default_id: NodeId, ids: Vec<NodeId>) -> anyhow::Result<()> {
start_network(default_id, ids).await
start_network(Config::from_env()?, default_id, ids).await
}

pub async fn start_network(default_id: NodeId, ids: Vec<NodeId>) -> anyhow::Result<()> {
let url = Url::parse(&format!("udp://{}", relay_addr().await?))?;
pub async fn start_network(
config: Config,
default_id: NodeId,
ids: Vec<NodeId>,
) -> anyhow::Result<()> {
let url = Url::parse(&format!("udp://{}", relay_addr(&config).await?))?;
let provider = IdentityCryptoProvider::new(default_id);

log::info!("starting network (hybrid) with identity: {}", default_id);
Expand Down Expand Up @@ -137,10 +140,10 @@ pub async fn start_network(default_id: NodeId, ids: Vec<NodeId>) -> anyhow::Resu
tokio::task::spawn_local(broadcast_handler(brx));
tokio::task::spawn_local(forward_handler(receiver, state.clone()));

// Keep server connection alive by pinging every `DEFAULT_PING_INTERVAL` seconds.
// Keep server connection alive by pinging every `YA_NET_DEFAULT_PING_INTERVAL` seconds.
let client_ = client.clone();
tokio::task::spawn_local(async move {
let mut interval = time::interval(DEFAULT_PING_INTERVAL);
let mut interval = time::interval(config.ping_interval);
loop {
interval.tick().await;
if let Ok(session) = client_.server_session().await {
Expand Down
17 changes: 6 additions & 11 deletions core/net/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
#[cfg(not(feature = "hybrid-net"))]
pub use central::*;
#[cfg(feature = "hybrid-net")]
pub use hybrid::*;

pub use ya_core_model::net::{
from, NetApiError, NetDst, NetSrc, RemoteEndpoint, TryRemoteEndpoint,
};

#[cfg(any(feature = "service", test))]
pub use service::{bind_broadcast_with_caller, broadcast, Net};

mod bcast;
#[cfg(not(feature = "hybrid-net"))]
mod central;
#[cfg(feature = "hybrid-net")]
mod hybrid;
#[cfg(any(feature = "service", test))]
pub mod central;
pub mod hybrid;
mod service;

mod config;
Loading