Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: dont keep persistent GatewayClient inside NMv1 #5211

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion common/client-libs/gateway-client/src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ impl ClientBandwidth {

if remaining < 0 {
tracing::warn!("OUT OF BANDWIDTH. remaining: {remaining_bi2}");
} else {
} else if remaining < 1_000_000 {
tracing::info!("remaining bandwidth: {remaining_bi2}");
} else {
tracing::debug!("remaining bandwidth: {remaining_bi2}");
}

self.inner
Expand Down
24 changes: 5 additions & 19 deletions common/client-libs/gateway-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl<C, St> GatewayClient<C, St> {
self.gateway_identity
}

pub fn shared_key(&self) -> Option<Arc<SharedGatewayKey>> {
self.shared_key.clone()
}

pub fn ws_fd(&self) -> Option<RawFd> {
match &self.connection {
SocketState::Available(conn) => ws_fd(conn.as_ref()),
Expand Down Expand Up @@ -402,7 +406,7 @@ impl<C, St> GatewayClient<C, St> {
}

Some(_) => {
info!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!");
debug!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!");
Ok(())
}
}
Expand Down Expand Up @@ -976,24 +980,6 @@ impl<C, St> GatewayClient<C, St> {
}
Ok(())
}

#[deprecated(note = "this method does not deal with upgraded keys for legacy clients")]
pub async fn authenticate_and_start(
&mut self,
) -> Result<AuthenticationResponse, GatewayClientError>
where
C: DkgQueryClient + Send + Sync,
St: CredentialStorage,
<St as CredentialStorage>::StorageError: Send + Sync + 'static,
{
let shared_key = self.perform_initial_authentication().await?;
self.claim_initial_bandwidth().await?;

// this call is NON-blocking
self.start_listening_for_mixnet_messages()?;

Ok(shared_key)
}
}

// type alias for an ease of use
Expand Down
5 changes: 5 additions & 0 deletions common/client-libs/gateway-client/src/socket_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ impl PartiallyDelegatedRouter {
}
};

if self.stream_return.is_canceled() {
// nothing to do, receiver has been dropped
return;
}

let return_res = match ret {
Err(err) => self.stream_return.send(Err(err)),
Ok(_) => {
Expand Down
9 changes: 9 additions & 0 deletions common/crypto/src/asymmetric/identity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ed25519_dalek::{Signer, SigningKey};
pub use ed25519_dalek::{Verifier, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH, SIGNATURE_LENGTH};
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use thiserror::Error;
use zeroize::{Zeroize, ZeroizeOnDrop};
Expand Down Expand Up @@ -122,6 +123,14 @@ impl PemStorableKeyPair for KeyPair {
#[derive(Copy, Clone, Eq, PartialEq)]
pub struct PublicKey(ed25519_dalek::VerifyingKey);

impl Hash for PublicKey {
fn hash<H: Hasher>(&self, state: &mut H) {
// each public key has unique bytes representation which can be used
// for the hash implementation
self.to_bytes().hash(state)
}
}

impl Display for PublicKey {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Display::fmt(&self.to_base58_string(), f)
Expand Down
20 changes: 9 additions & 11 deletions nym-api/src/network_monitor/gateways_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
// SPDX-License-Identifier: GPL-3.0-only

use futures::Stream;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{ed25519, identity};
use nym_gateway_client::{AcknowledgementReceiver, MixnetMessageReceiver};
use nym_mixnet_contract_common::IdentityKey;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::StreamMap;
Expand All @@ -15,8 +14,8 @@ pub(crate) enum GatewayMessages {
}

pub(crate) struct GatewaysReader {
ack_map: StreamMap<IdentityKey, AcknowledgementReceiver>,
stream_map: StreamMap<IdentityKey, MixnetMessageReceiver>,
ack_map: StreamMap<ed25519::PublicKey, AcknowledgementReceiver>,
stream_map: StreamMap<ed25519::PublicKey, MixnetMessageReceiver>,
}

impl GatewaysReader {
Expand All @@ -33,19 +32,18 @@ impl GatewaysReader {
message_receiver: MixnetMessageReceiver,
ack_receiver: AcknowledgementReceiver,
) {
let channel_id = id.to_string();
self.stream_map.insert(channel_id.clone(), message_receiver);
self.ack_map.insert(channel_id, ack_receiver);
self.stream_map.insert(id, message_receiver);
self.ack_map.insert(id, ack_receiver);
}

pub fn remove_receivers(&mut self, id: &str) {
self.stream_map.remove(id);
self.ack_map.remove(id);
pub fn remove_receivers(&mut self, id: ed25519::PublicKey) {
self.stream_map.remove(&id);
self.ack_map.remove(&id);
}
}

impl Stream for GatewaysReader {
type Item = (IdentityKey, GatewayMessages);
type Item = (ed25519::PublicKey, GatewayMessages);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.ack_map).poll_next(cx) {
Expand Down
46 changes: 22 additions & 24 deletions nym-api/src/network_monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::node_status_api::NodeStatusCache;
use crate::nym_contract_cache::cache::NymContractCache;
use crate::storage::NymApiStorage;
use crate::support::caching::cache::SharedCache;
use crate::support::{config, nyxd};
use crate::support::config::Config;
use crate::support::nyxd;
use futures::channel::mpsc;
use nym_bandwidth_controller::BandwidthController;
use nym_credential_storage::persistent_storage::PersistentStorage;
Expand All @@ -36,7 +37,7 @@ pub(crate) mod test_route;
pub(crate) const ROUTE_TESTING_TEST_NONCE: u64 = 0;

pub(crate) fn setup<'a>(
config: &'a config::NetworkMonitor,
config: &'a Config,
nym_contract_cache: &NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
Expand All @@ -54,7 +55,7 @@ pub(crate) fn setup<'a>(
}

pub(crate) struct NetworkMonitorBuilder<'a> {
config: &'a config::NetworkMonitor,
config: &'a Config,
nyxd_client: nyxd::Client,
node_status_storage: NymApiStorage,
contract_cache: NymContractCache,
Expand All @@ -64,7 +65,7 @@ pub(crate) struct NetworkMonitorBuilder<'a> {

impl<'a> NetworkMonitorBuilder<'a> {
pub(crate) fn new(
config: &'a config::NetworkMonitor,
config: &'a Config,
nyxd_client: nyxd::Client,
node_status_storage: NymApiStorage,
contract_cache: NymContractCache,
Expand All @@ -81,7 +82,7 @@ impl<'a> NetworkMonitorBuilder<'a> {
}
}

pub(crate) async fn build<R: MessageReceiver + Send + 'static>(
pub(crate) async fn build<R: MessageReceiver + Send + Sync + 'static>(
self,
) -> NetworkMonitorRunnables<R> {
// TODO: those keys change constant throughout the whole execution of the monitor.
Expand All @@ -101,7 +102,7 @@ impl<'a> NetworkMonitorBuilder<'a> {
self.contract_cache,
self.described_cache,
self.node_status_cache,
self.config.debug.per_node_test_packets,
self.config.network_monitor.debug.per_node_test_packets,
Arc::clone(&ack_key),
*identity_keypair.public_key(),
*encryption_keypair.public_key(),
Expand All @@ -110,35 +111,38 @@ impl<'a> NetworkMonitorBuilder<'a> {
let bandwidth_controller = {
BandwidthController::new(
nym_credential_storage::initialise_persistent_storage(
&self.config.storage_paths.credentials_database_path,
&self
.config
.network_monitor
.storage_paths
.credentials_database_path,
)
.await,
self.nyxd_client.clone(),
)
};

let packet_sender = new_packet_sender(
self.config,
&self.config,
gateway_status_update_sender,
Arc::clone(&identity_keypair),
self.config.debug.gateway_sending_rate,
bandwidth_controller,
self.config.debug.disabled_credentials_mode,
);

let received_processor = new_received_processor(
received_processor_receiver_channel,
Arc::clone(&encryption_keypair),
ack_key,
);
let summary_producer = new_summary_producer(self.config.debug.per_node_test_packets);
let summary_producer =
new_summary_producer(self.config.network_monitor.debug.per_node_test_packets);
let packet_receiver = new_packet_receiver(
gateway_status_update_receiver,
received_processor_sender_channel,
);

let monitor = Monitor::new(
self.config,
&self.config.network_monitor,
packet_preparer,
packet_sender,
received_processor,
Expand All @@ -154,12 +158,12 @@ impl<'a> NetworkMonitorBuilder<'a> {
}
}

pub(crate) struct NetworkMonitorRunnables<R: MessageReceiver + Send + 'static> {
pub(crate) struct NetworkMonitorRunnables<R: MessageReceiver + Send + Sync + 'static> {
monitor: Monitor<R>,
packet_receiver: PacketReceiver,
}

impl<R: MessageReceiver + Send + 'static> NetworkMonitorRunnables<R> {
impl<R: MessageReceiver + Send + Sync + 'static> NetworkMonitorRunnables<R> {
// TODO: note, that is not exactly doing what we want, because when
// `ReceivedProcessor` is constructed, it already spawns a future
// this needs to be refactored!
Expand Down Expand Up @@ -194,22 +198,16 @@ fn new_packet_preparer(
}

fn new_packet_sender(
config: &config::NetworkMonitor,
config: &Config,
gateways_status_updater: GatewayClientUpdateSender,
local_identity: Arc<identity::KeyPair>,
max_sending_rate: usize,
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
disabled_credentials_mode: bool,
) -> PacketSender {
PacketSender::new(
config,
gateways_status_updater,
local_identity,
config.debug.gateway_response_timeout,
config.debug.gateway_connection_timeout,
config.debug.max_concurrent_gateway_clients,
max_sending_rate,
bandwidth_controller,
disabled_credentials_mode,
)
}

Expand All @@ -236,8 +234,8 @@ fn new_packet_receiver(

// TODO: 1) does it still have to have separate builder or could we get rid of it now?
// TODO: 2) how do we make it non-async as other 'start' methods?
pub(crate) async fn start<R: MessageReceiver + Send + 'static>(
config: &config::NetworkMonitor,
pub(crate) async fn start<R: MessageReceiver + Send + Sync + 'static>(
config: &Config,
nym_contract_cache: &NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
Expand Down
54 changes: 54 additions & 0 deletions nym-api/src/network_monitor/monitor/gateway_client_handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only

use crate::network_monitor::monitor::receiver::{GatewayClientUpdate, GatewayClientUpdateSender};
use crate::support::nyxd;
use nym_credential_storage::persistent_storage::PersistentStorage;
use nym_gateway_client::GatewayClient;
use std::ops::{Deref, DerefMut};
use tracing::warn;

pub(crate) struct GatewayClientHandle {
client: GatewayClient<nyxd::Client, PersistentStorage>,
gateways_status_updater: GatewayClientUpdateSender,
}

impl GatewayClientHandle {
pub(crate) fn new(
client: GatewayClient<nyxd::Client, PersistentStorage>,
gateways_status_updater: GatewayClientUpdateSender,
) -> Self {
GatewayClientHandle {
client,
gateways_status_updater,
}
}
}

impl Drop for GatewayClientHandle {
fn drop(&mut self) {
if self
.gateways_status_updater
.unbounded_send(GatewayClientUpdate::Disconnect(
self.client.gateway_identity(),
))
.is_err()
{
warn!("fail to cleanly shutdown gateway connection")
}
}
}

impl Deref for GatewayClientHandle {
type Target = GatewayClient<nyxd::Client, PersistentStorage>;

fn deref(&self) -> &Self::Target {
&self.client
}
}

impl DerefMut for GatewayClientHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}
Loading
Loading