Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state-sync) State sync actors #9669

Merged
merged 5 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::client::{Client, EPOCH_START_INFO_BLOCKS};
use crate::config_updater::ConfigUpdater;
use crate::debug::new_network_info_view;
use crate::info::{display_sync_status, InfoHelper};
use crate::sync::adapter::SyncMessage;
use crate::sync::state::{StateSync, StateSyncResult};
use crate::sync_jobs_actor::{create_sync_job_scheduler, SyncJobsActor};
use crate::{metrics, StatusResponse};
Expand Down Expand Up @@ -1857,6 +1858,18 @@ impl Handler<WithSpanContext<GetClientConfig>> for ClientActor {
}
}

impl Handler<WithSpanContext<SyncMessage>> for ClientActor {
type Result = ();

#[perf]
fn handle(&mut self, msg: WithSpanContext<SyncMessage>, _: &mut Context<Self>) -> Self::Result {
let (_span, msg) = handler_debug_span!(target: "client", msg);
tracing::debug!(target: "client", ?msg);
// TODO
// process messages from SyncActors
}
}

/// Returns random seed sampled from the current thread
pub fn random_seed_from_thread() -> RngSeed {
let mut rng_seed: RngSeed = [0; 32];
Expand Down
4 changes: 2 additions & 2 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ pub use near_client_primitives::types::{
QueryError, Status, StatusResponse, SyncStatus, TxStatus, TxStatusError,
};

pub use near_client_primitives::debug::DebugStatus;

pub use crate::adapter::{
BlockApproval, BlockResponse, ProcessTxRequest, ProcessTxResponse, SetNetworkInfo,
};
Expand All @@ -18,7 +16,9 @@ pub use crate::client::Client;
pub use crate::client_actor::NetworkAdversarialMessage;
pub use crate::client_actor::{start_client, ClientActor};
pub use crate::config_updater::ConfigUpdater;
pub use crate::sync::adapter::{SyncAdapter, SyncMessage};
pub use crate::view_client::{start_view_client, ViewClientActor};
pub use near_client_primitives::debug::DebugStatus;

pub mod adapter;
pub mod adversarial;
Expand Down
110 changes: 110 additions & 0 deletions chain/client/src/sync/adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use super::sync_actor::SyncActor;
use actix::{dev::ToEnvelope, Actor, MailboxError, Message};
use core::fmt::Debug;
use near_async::messaging::Sender;
use near_network::types::{
PeerManagerMessageRequest, StateSync as NetworkStateSync, StateSyncResponse,
};
use near_o11y::WithSpanContextExt;
use near_primitives::hash::CryptoHash;
use near_store::ShardUId;
use std::collections::HashMap;
use tracing::warn;

/// Information about the shard being synced
#[derive(Debug)]
pub struct SyncShardInfo {
pub shard_uid: ShardUId,
pub sync_hash: CryptoHash,
}

/// Messages between Client and Sync Actor
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub enum SyncMessage {
/// Notify an active actor to start syncing
StartSync(SyncShardInfo),
/// Notify the client that the work is done
SyncDone(SyncShardInfo),
}

struct ActorHandler {
/// Address of actor mailbox
addr: actix::Addr<SyncActor>,
/// Thread handler of actor
arbiter: actix::Arbiter,
}

/// Manager for state sync threads.
/// Offers functions to interact with the sync actors, such as start, stop and send messages.
pub struct SyncAdapter {
/// Address of the sync actors indexed by the SharUid
actor_handler_map: HashMap<ShardUId, ActorHandler>,
/// Message channel for client
client_adapter: Sender<SyncMessage>,
/// Message channel with network
network_adapter: Sender<PeerManagerMessageRequest>,
}

impl SyncAdapter {
pub fn new(
client_adapter: Sender<SyncMessage>,
network_adapter: Sender<PeerManagerMessageRequest>,
) -> Self {
Self { actor_handler_map: [].into(), client_adapter, network_adapter }
}

/// Starts a new arbiter and runs the actor on it
pub fn start(&mut self, shard_uid: ShardUId) {
assert!(!self.actor_handler_map.contains_key(&shard_uid), "Actor already started.");
let arbiter = actix::Arbiter::new();
VanBarbascu marked this conversation as resolved.
Show resolved Hide resolved
let arbiter_handle = arbiter.handle();
let client = self.client_adapter.clone();
let network = self.network_adapter.clone();
let addr = SyncActor::start_in_arbiter(&arbiter_handle, move |_ctx| {
SyncActor::new(shard_uid, client, network)
});
self.actor_handler_map.insert(shard_uid, ActorHandler { addr, arbiter });
}

/// Stop the actor and remove it
pub fn stop(&mut self, shard_uid: ShardUId) {
self.actor_handler_map
.get_mut(&shard_uid)
.map_or_else(|| panic!("Actor not started."), |v| v.arbiter.stop());
}

/// Forward message to the right shard
async fn send<M>(&mut self, shard_uid: ShardUId, msg: M)
where
M: Message + Send + 'static,
M::Result: Send,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The empty line seems unnecessary.

SyncActor: actix::Handler<M>,
<SyncActor as Actor>::Context: ToEnvelope<SyncActor, M>,
{
let handler = self.actor_handler_map.get_mut(&shard_uid);
match handler {
None => {
warn!(target: "sync", ?shard_uid, "Tried sending message to non existing actor.")
}
Some(handler) => match handler.addr.send(msg).await {
Ok(_) => {}
Err(MailboxError::Closed) => {
warn!(target: "sync", ?shard_uid, "Error sending message, mailbox is closed.")
}
Err(MailboxError::Timeout) => {
warn!(target: "sync", ?shard_uid, "Error sending message, timeout.")
}
},
}
}
}

/// Interface for network
#[async_trait::async_trait]
impl NetworkStateSync for SyncAdapter {
async fn send(&mut self, shard_uid: ShardUId, msg: StateSyncResponse) {
self.send(shard_uid, msg.with_span_context()).await;
}
}
2 changes: 2 additions & 0 deletions chain/client/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod adapter;
pub mod block;
pub mod epoch;
pub mod external;
pub mod header;
pub mod state;
pub mod sync_actor;
109 changes: 109 additions & 0 deletions chain/client/src/sync/sync_actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use super::adapter::{SyncMessage as ClientSyncMessage, SyncShardInfo};
use near_async::messaging::Sender;
use near_network::types::{PeerManagerMessageRequest, StateSyncResponse};
use near_o11y::{handler_debug_span, OpenTelemetrySpanExt, WithSpanContext};
use near_performance_metrics_macros::perf;
use near_primitives::hash::CryptoHash;
use near_store::ShardUId;
use tracing::{debug, info, warn};

/// Message channels
#[allow(dead_code)]
struct MessageSenders {
VanBarbascu marked this conversation as resolved.
Show resolved Hide resolved
/// Used to send messages to client
client_adapter: Sender<ClientSyncMessage>,
/// Used to send messages to peer manager
network_adapter: Sender<PeerManagerMessageRequest>,
}

/// Actor that runs state sync for a shard
#[allow(dead_code)]
pub struct SyncActor {
/// Shard being synced
shard_uid: ShardUId,
/// Hash of the state that is downloaded
sync_hash: CryptoHash,
/// Channels used to communicate with other actors
senders: MessageSenders,
}

impl SyncActor {
pub fn new(
shard_uid: ShardUId,
client_adapter: Sender<ClientSyncMessage>,
network_adapter: Sender<PeerManagerMessageRequest>,
) -> Self {
Self {
shard_uid,
sync_hash: CryptoHash::new(),
VanBarbascu marked this conversation as resolved.
Show resolved Hide resolved
senders: MessageSenders { client_adapter, network_adapter },
}
}

fn handle_client_sync_message(&mut self, msg: ClientSyncMessage) {
match msg {
ClientSyncMessage::StartSync(SyncShardInfo { sync_hash, shard_uid }) => {
assert_eq!(shard_uid, self.shard_uid, "Message is not for this shard SyncActor");
// Start syncing the shard.
info!(target: "sync", shard_id = ?self.shard_uid.shard_id, "Startgin sync on shard");
// TODO: Add logic to commence state sync.
self.sync_hash = sync_hash;
}
ClientSyncMessage::SyncDone(_) => {
warn!(target: "sync", "Unsupported message received by SyncActor: SyncDone.");
}
}
}

fn handle_network_sync_message(&mut self, msg: StateSyncResponse) {
match msg {
StateSyncResponse::HeaderResponse => {
debug!(target: "sync", shard_id = ?self.shard_uid.shard_id, "Got header response");
}
StateSyncResponse::PartResponse => {
warn!(target: "sync", "Unsupported message received by SyncActor: SyncDone.");
}
}
}
}

/// Control the flow of the state sync actor
impl actix::Actor for SyncActor {
type Context = actix::Context<Self>;

fn started(&mut self, _ctx: &mut Self::Context) {
info!(target: "sync", shard_id = ?self.shard_uid.shard_id, "Sync actor started.");
}

fn stopped(&mut self, _ctx: &mut Self::Context) {
info!(target: "sync", shard_id = ?self.shard_uid.shard_id, "Sync actor stopped.");
}
}

/// Process messages from client
impl actix::Handler<WithSpanContext<ClientSyncMessage>> for SyncActor {
type Result = ();
#[perf]
fn handle(
&mut self,
msg: WithSpanContext<ClientSyncMessage>,
_ctx: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_debug_span!(target: "sync", msg);
self.handle_client_sync_message(msg);
VanBarbascu marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Process messages from network
impl actix::Handler<WithSpanContext<StateSyncResponse>> for SyncActor {
type Result = ();
#[perf]
fn handle(
&mut self,
msg: WithSpanContext<StateSyncResponse>,
_ctx: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_debug_span!(target: "sync", msg);
self.handle_network_sync_message(msg);
}
}
1 change: 1 addition & 0 deletions chain/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod debug;
pub mod raw;
pub mod routing;
pub mod shards_manager;
pub mod state_sync;
pub mod tcp;
pub mod test_loop;
pub mod test_utils;
Expand Down
17 changes: 17 additions & 0 deletions chain/network/src/state_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use near_store::ShardUId;

/// State sync response from peers.
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub enum StateSyncResponse {
HeaderResponse,
PartResponse,
}

/// A strongly typed asynchronous API for the State Sync logic
/// It abstracts away the fact that it is implemented using actix
/// actors.
#[async_trait::async_trait]
pub trait StateSync: Send + Sync + 'static {
async fn send(&mut self, shard_uid: ShardUId, msg: StateSyncResponse);
}
14 changes: 7 additions & 7 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ pub use crate::network_protocol::{
Disconnect, Encoding, Handshake, HandshakeFailureReason, PeerMessage, RoutingTableUpdate,
SignedAccountData,
};
/// Exported types, which are part of network protocol.
pub use crate::network_protocol::{
Edge, PartialEdgeInfo, PartialEncodedChunkForwardMsg, PartialEncodedChunkRequestMsg,
PartialEncodedChunkResponseMsg, PeerChainInfoV2, PeerInfo, StateResponseInfo,
StateResponseInfoV1, StateResponseInfoV2,
};
use crate::routing::routing_table_view::RoutingTableInfo;
pub use crate::state_sync::{StateSync, StateSyncResponse};
use near_async::messaging::{
AsyncSender, CanSend, CanSendAsync, IntoAsyncSender, IntoSender, Sender,
};
Expand All @@ -22,13 +29,6 @@ use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;

/// Exported types, which are part of network protocol.
pub use crate::network_protocol::{
Edge, PartialEdgeInfo, PartialEncodedChunkForwardMsg, PartialEncodedChunkRequestMsg,
PartialEncodedChunkResponseMsg, PeerChainInfoV2, PeerInfo, StateResponseInfo,
StateResponseInfoV1, StateResponseInfoV2,
};

/// Number of hops a message is allowed to travel before being dropped.
/// This is used to avoid infinite loop because of inconsistent view of the network
/// by different nodes.
Expand Down
21 changes: 20 additions & 1 deletion nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use near_async::time;
use near_chain::state_snapshot_actor::{get_make_snapshot_callback, StateSnapshotActor};
use near_chain::types::RuntimeAdapter;
use near_chain::{Chain, ChainGenesis};
use near_chain_configs::SyncConfig;
use near_chunks::shards_manager_actor::start_shards_manager;
use near_client::sync::adapter::SyncAdapter;
use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor};
use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::EpochManager;
Expand Down Expand Up @@ -287,6 +289,18 @@ pub fn start_with_config_and_synchronization(
hash: *genesis_block.header().hash(),
};

// State Sync actors
let client_adapter_for_sync = Arc::new(LateBoundSender::default());
VanBarbascu marked this conversation as resolved.
Show resolved Hide resolved
let network_adapter_for_sync = Arc::new(LateBoundSender::default());
let _sync_adapter = Arc::new(if let SyncConfig::Peers = config.client_config.state_sync.sync {
Some(SyncAdapter::new(
client_adapter_for_sync.as_sender(),
network_adapter_for_sync.as_sender(),
))
} else {
None
});

let node_id = config.network_config.node_id();
let network_adapter = Arc::new(LateBoundSender::default());
let shards_manager_adapter = Arc::new(LateBoundSender::default());
Expand Down Expand Up @@ -329,6 +343,9 @@ pub fn start_with_config_and_synchronization(
adv,
config_updater,
);
if let SyncConfig::Peers = config.client_config.state_sync.sync {
client_adapter_for_sync.bind(client_actor.clone().with_auto_span_context())
};
client_adapter_for_shards_manager.bind(client_actor.clone().with_auto_span_context());
let (shards_manager_actor, shards_manager_arbiter_handle) = start_shards_manager(
epoch_manager.clone(),
Expand Down Expand Up @@ -370,7 +387,9 @@ pub fn start_with_config_and_synchronization(
)
.context("PeerManager::spawn()")?;
network_adapter.bind(network_actor.clone().with_auto_span_context());

if let SyncConfig::Peers = config.client_config.state_sync.sync {
network_adapter_for_sync.bind(network_actor.clone().with_auto_span_context())
}
#[cfg(feature = "json_rpc")]
if let Some(rpc_config) = config.rpc_config {
let entity_debug_handler = EntityDebugHandlerImpl {
Expand Down