diff --git a/data_structures/src/builders.rs b/data_structures/src/builders.rs index 39ee5e328..a84cbf491 100644 --- a/data_structures/src/builders.rs +++ b/data_structures/src/builders.rs @@ -10,10 +10,13 @@ use crate::{ SuperBlockVote, }, error::BuildersError, + get_protocol_version_activation_epoch, get_protocol_version_period, + proto::versioning::ProtocolVersion, + strum::IntoEnumIterator, transaction::Transaction, types::{ Address, Command, GetPeers, InventoryAnnouncement, InventoryRequest, IpAddress, LastBeacon, - Message, Peers, Verack, Version, + Message, Peers, ProtocolVersion as ProtocolVersionType, Verack, Version, }, }; @@ -59,6 +62,15 @@ impl Message { beacon: LastBeacon, ) -> Message { let addr = sender_addr.map(to_address); + + let mut protocol_versions = vec![]; + for protocol in ProtocolVersion::iter() { + protocol_versions.push(ProtocolVersionType { + version: protocol.into(), + activation_epoch: get_protocol_version_activation_epoch(protocol), + checkpoint_period: get_protocol_version_period(protocol), + }); + } Message::build_message( magic, Command::Version(Version { @@ -70,6 +82,7 @@ impl Message { user_agent: user_agent(), nonce: random_nonce(), beacon, + protocol_versions, }), ) } diff --git a/data_structures/src/proto/versioning.rs b/data_structures/src/proto/versioning.rs index 96cc5e10c..47bda8b17 100644 --- a/data_structures/src/proto/versioning.rs +++ b/data_structures/src/proto/versioning.rs @@ -21,7 +21,7 @@ use crate::{ }, ProtobufConvert, }, - types::Message, + types::{Message, ProtocolVersionName}, }; #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] @@ -167,6 +167,16 @@ impl PartialOrd for ProtocolVersion { } } +impl From for ProtocolVersion { + fn from(version: ProtocolVersionName) -> Self { + match version { + ProtocolVersionName::V1_7(_) => ProtocolVersion::V1_7, + ProtocolVersionName::V1_8(_) => ProtocolVersion::V1_8, + ProtocolVersionName::V2_0(_) => ProtocolVersion::V2_0, + } + } +} + pub trait Versioned: ProtobufConvert { type LegacyType: protobuf::Message; diff --git a/data_structures/src/types.rs b/data_structures/src/types.rs index 10afcc31c..676fcb649 100644 --- a/data_structures/src/types.rs +++ b/data_structures/src/types.rs @@ -6,7 +6,9 @@ use serde::{Deserialize, Serialize}; use crate::{ chain::{Block, CheckpointBeacon, Hashable, InventoryEntry, SuperBlock, SuperBlockVote}, - proto::{schema::witnet, ProtobufConvert}, + proto::{ + schema::witnet, versioning::ProtocolVersion as VersioningProtocolVersion, ProtobufConvert, + }, transaction::Transaction, }; @@ -50,7 +52,26 @@ impl fmt::Display for Command { Command::GetPeers(_) => f.write_str("GET_PEERS"), Command::Peers(_) => f.write_str("PEERS"), Command::Verack(_) => f.write_str("VERACK"), - Command::Version(_) => f.write_str("VERSION"), + Command::Version(Version { + version: v, + sender_address: sa, + protocol_versions: pv, + .. + }) => { + let mut protocol_versions_str = String::from("("); + for protocol in pv { + protocol_versions_str.push_str(&format!( + "(version: {:?}, activation_epoch: {}, period: {}),", + protocol.version, protocol.activation_epoch, protocol.checkpoint_period + )); + } + protocol_versions_str.push_str(")"); + write!( + f, + "VERSION MESSAGE: version = {}, sender_address = {:?}, protocol_versions = {}", + v, sa, protocol_versions_str, + ) + } Command::Block(block) => write!( f, "BLOCK #{}: {}", @@ -112,6 +133,32 @@ pub struct Peers { #[protobuf_convert(pb = "witnet::Verack")] pub struct Verack; +#[derive(Clone, Copy, Debug, Eq, PartialEq, ProtobufConvert)] +#[protobuf_convert(pb = "witnet::ProtocolVersionName")] +pub enum ProtocolVersionName { + V1_7(bool), + V1_8(bool), + V2_0(bool), +} + +impl From for ProtocolVersionName { + fn from(version: VersioningProtocolVersion) -> Self { + match version { + VersioningProtocolVersion::V1_7 => ProtocolVersionName::V1_7(true), + VersioningProtocolVersion::V1_8 => ProtocolVersionName::V1_8(true), + VersioningProtocolVersion::V2_0 => ProtocolVersionName::V2_0(true), + } + } +} + +#[derive(Debug, Eq, PartialEq, Clone, ProtobufConvert)] +#[protobuf_convert(pb = "witnet::ProtocolVersion")] +pub struct ProtocolVersion { + pub version: ProtocolVersionName, + pub activation_epoch: u32, + pub checkpoint_period: u16, +} + #[derive(Debug, Eq, PartialEq, Clone, ProtobufConvert)] #[protobuf_convert(pb = "witnet::Version")] pub struct Version { @@ -123,6 +170,7 @@ pub struct Version { pub user_agent: String, pub nonce: u64, pub beacon: LastBeacon, + pub protocol_versions: Vec, } /////////////////////////////////////////////////////////// diff --git a/node/src/actors/chain_manager/handlers.rs b/node/src/actors/chain_manager/handlers.rs index b1425a3bb..86834e8b9 100644 --- a/node/src/actors/chain_manager/handlers.rs +++ b/node/src/actors/chain_manager/handlers.rs @@ -41,8 +41,8 @@ use crate::{ GetMempoolResult, GetNodeStats, GetProtocolInfo, GetReputation, GetReputationResult, GetSignalingInfo, GetState, GetSuperBlockVotes, GetSupplyInfo, GetUtxoInfo, IsConfirmedBlock, PeersBeacons, QueryStake, ReputationStats, Rewind, SendLastBeacon, - SessionUnitResult, SetLastBeacon, SetPeersLimits, SignalingInfo, SnapshotExport, - SnapshotImport, TryMineBlock, + SessionUnitResult, SetEpochConstants, SetLastBeacon, SetPeersLimits, SignalingInfo, + SnapshotExport, SnapshotImport, TryMineBlock, }, sessions_manager::SessionsManager, }, @@ -2132,6 +2132,16 @@ impl Handler for ChainManager { Box::pin(fut) } } + +impl Handler for ChainManager { + type Result = (); + + fn handle(&mut self, msg: SetEpochConstants, _ctx: &mut Context) -> Self::Result { + log::debug!("Received new epoch constants: {:?}", msg.epoch_constants); + self.epoch_constants = Some(msg.epoch_constants); + } +} + #[derive(Debug, Eq, PartialEq)] pub enum BlockBatches { TargetNotReached(Vec), diff --git a/node/src/actors/epoch_manager/handlers.rs b/node/src/actors/epoch_manager/handlers.rs index a7188a37b..2d3500471 100644 --- a/node/src/actors/epoch_manager/handlers.rs +++ b/node/src/actors/epoch_manager/handlers.rs @@ -80,6 +80,8 @@ impl Handler for EpochManager { type Result = (); fn handle(&mut self, msg: SetEpochConstants, _ctx: &mut Context) -> Self::Result { + log::debug!("Received new epoch constants: {:?}", msg.epoch_constants); + // Check if the epoch calculated with the current version of the epoch constants // and the last_checked_epoch are different and if they are, subtract that difference // from the new last_checked_epoch. diff --git a/node/src/actors/messages.rs b/node/src/actors/messages.rs index 82c5eadb0..a258180b4 100644 --- a/node/src/actors/messages.rs +++ b/node/src/actors/messages.rs @@ -34,7 +34,7 @@ use witnet_data_structures::{ UnstakeTransaction, VTTransaction, }, transaction_factory::NodeBalance, - types::LastBeacon, + types::{LastBeacon, ProtocolVersion}, utxo_pool::{UtxoInfo, UtxoSelectionStrategy}, wit::Wit, }; @@ -1472,6 +1472,19 @@ impl Message for crate::actors::messages::GetProtocolInfo { type Result = Result, failure::Error>; } +/// Message indicating the last beacon received from a peer +#[derive(Clone, Debug)] +pub struct SendProtocolVersions { + /// Socket address which identifies the peer + pub address: SocketAddr, + /// Protocol versions received from peer + pub protocol_versions: Vec, +} + +impl Message for SendProtocolVersions { + type Result = (); +} + #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(untagged)] /// A value that can either be L, R, where an R can always be obtained through the `do_magic` method. diff --git a/node/src/actors/session/handlers.rs b/node/src/actors/session/handlers.rs index 67a8ddc94..405e6088c 100644 --- a/node/src/actors/session/handlers.rs +++ b/node/src/actors/session/handlers.rs @@ -13,12 +13,12 @@ use witnet_data_structures::{ chain::{ Block, CheckpointBeacon, Epoch, InventoryEntry, InventoryItem, SuperBlock, SuperBlockVote, }, - get_protocol_version, + get_protocol_version, get_protocol_version_activation_epoch, get_protocol_version_period, proto::versioning::{Versioned, VersionedHashable}, transaction::Transaction, types::{ Address, Command, InventoryAnnouncement, InventoryRequest, LastBeacon, - Message as WitnetMessage, Peers, Version, + Message as WitnetMessage, Peers, ProtocolVersion, ProtocolVersionName, Version, }, }; use witnet_p2p::sessions::{SessionStatus, SessionType}; @@ -32,8 +32,8 @@ use crate::actors::{ AddTransaction, CloseSession, Consolidate, EpochNotification, GetBlocksEpochRange, GetHighestCheckpointBeacon, GetItem, GetSuperBlockVotes, PeerBeacon, RemoveAddressesFromTried, RequestPeers, SendGetPeers, SendInventoryAnnouncement, - SendInventoryItem, SendInventoryRequest, SendLastBeacon, SendSuperBlockVote, - SessionUnitResult, + SendInventoryItem, SendInventoryRequest, SendLastBeacon, SendProtocolVersions, + SendSuperBlockVote, SessionUnitResult, }, peers_manager::PeersManager, sessions_manager::SessionsManager, @@ -75,6 +75,8 @@ enum HandshakeError { current_ts: i64, timestamp_diff: i64, }, + #[fail(display = "Received versions message has incompatible protocol version information")] + IncompatibleProtocolVersion {}, } /// Implement WriteHandler for Session @@ -182,16 +184,18 @@ impl StreamHandler> for Session { current_ts, self.current_epoch, ) { - Ok(msgs) => { + Ok((msgs, protocol_versions)) => { for msg in msgs { self.send_message(msg); } try_consolidate_session(self, ctx); + session_send_protocols_info(self, protocol_versions); } Err(err) => { if let HandshakeError::DifferentTimestamp { .. } - | HandshakeError::DifferentEpoch { .. } = err + | HandshakeError::DifferentEpoch { .. } + | HandshakeError::IncompatibleProtocolVersion {} = err { // Remove this address from tried bucket and ice it self.remove_and_ice_peer(); @@ -844,17 +848,53 @@ fn handshake_verack(session: &mut Session) { flags.verack_rx = true; } +// If a peer is on a protocol version with faster block times, epochs won't align +// Attempt to recaculate the epoch using the protocol version from the other peer +fn recalculate_epoch( + time_since_genesis: i64, + checkpoints_period: u16, + wit2_protocol_version: Option, +) -> Epoch { + match wit2_protocol_version { + Some(protocol) => { + let seconds_to_wit2 = protocol.activation_epoch * u32::from(checkpoints_period); + let seconds_since_wit2 = time_since_genesis as u32 - seconds_to_wit2; + let epochs_since_wit2 = seconds_since_wit2 / u32::from(protocol.checkpoint_period); + + protocol.activation_epoch + epochs_since_wit2 + } + None => (time_since_genesis / i64::from(checkpoints_period)) + .try_into() + .unwrap(), + } +} + fn check_beacon_compatibility( current_beacon: &LastBeacon, received_beacon: &LastBeacon, current_epoch: Epoch, + time_since_genesis: i64, + checkpoints_period: u16, target_superblock_beacon: Option, + wit2_protocol_version: Option, ) -> Result<(), HandshakeError> { if received_beacon.highest_block_checkpoint.checkpoint > current_epoch { - return Err(HandshakeError::DifferentEpoch { + let recalculated_epoch = recalculate_epoch( + time_since_genesis, + checkpoints_period, + wit2_protocol_version, + ); + log::debug!( + "Recalculated epoch {} -> {}", current_epoch, - received_beacon: received_beacon.clone(), - }); + recalculated_epoch + ); + if received_beacon.highest_block_checkpoint.checkpoint > recalculated_epoch { + return Err(HandshakeError::DifferentEpoch { + current_epoch, + received_beacon: received_beacon.clone(), + }); + } } // In order to improve the synchronization process, if we have information about the last @@ -910,6 +950,54 @@ fn check_beacon_compatibility( } } +fn check_protocol_version_compatibility( + received_protocol_versions: &Vec, +) -> Result, HandshakeError> { + let mut protocol_versions = vec![]; + for protocol in received_protocol_versions { + // Protocol version not activated yet + if protocol.activation_epoch == u32::MAX || protocol.checkpoint_period == u16::MAX { + log::debug!("Received inactive protocol {:?}", protocol.version); + continue; + } + + // Check already registered protocols for incompatibilities + let protocol_activation_epoch = + get_protocol_version_activation_epoch(protocol.version.into()); + let protocol_period = get_protocol_version_period(protocol.version.into()); + if protocol_activation_epoch != u32::MAX + && protocol_activation_epoch != protocol.activation_epoch + { + log::debug!( + "Received protocol {:?} with an incompatible activation epoch", + protocol.version + ); + return Err(HandshakeError::IncompatibleProtocolVersion {}); + } else if protocol_period != u16::MAX && protocol_period != protocol.checkpoint_period { + log::debug!( + "Received protocol {:?} with an incompatible activation epoch", + protocol.checkpoint_period + ); + return Err(HandshakeError::IncompatibleProtocolVersion {}); + } + + // Protocol already registered with the same parameters + if protocol.activation_epoch == protocol_activation_epoch + || protocol.checkpoint_period == protocol_period + { + log::debug!( + "Received already registered protocol {:?}", + protocol.version + ); + continue; + } + + protocol_versions.push(protocol.clone()); + } + + Ok(protocol_versions) +} + /// Check that the received timestamp is close enough to the current timestamp fn check_timestamp_drift( current_ts: i64, @@ -938,7 +1026,7 @@ fn handshake_version( command_version: &Version, current_ts: i64, current_epoch: Epoch, -) -> Result, HandshakeError> { +) -> Result<(Vec, Vec), HandshakeError> { // Check that the received timestamp is close enough to the current timestamp let received_ts = command_version.timestamp; let max_ts_diff = session.config.connections.handshake_max_ts_diff; @@ -948,13 +1036,30 @@ fn handshake_version( let current_beacon = &session.last_beacon; let received_beacon = &command_version.beacon; + let received_protocol_versions = &command_version.protocol_versions; + + let mut protocol_versions = vec![]; match session.session_type { SessionType::Outbound | SessionType::Feeler => { + protocol_versions = check_protocol_version_compatibility(received_protocol_versions)?; + let mut wit2_protocol_version = None; + for protocol in protocol_versions.iter() { + if protocol.version == ProtocolVersionName::V2_0(true) { + wit2_protocol_version = Some(protocol.clone()); + break; + } + } + + let time_since_genesis = + current_ts - session.config.consensus_constants.checkpoint_zero_timestamp; check_beacon_compatibility( current_beacon, received_beacon, current_epoch, + time_since_genesis, + session.config.consensus_constants.checkpoints_period, session.superblock_beacon_target, + wit2_protocol_version, )?; } // Do not check beacon for inbound peers @@ -989,7 +1094,7 @@ fn handshake_version( responses.push(version); } - Ok(responses) + Ok((responses, protocol_versions)) } fn send_inventory_item_msg(session: &mut Session, item: InventoryItem) { @@ -1140,6 +1245,13 @@ fn process_superblock_vote(_session: &mut Session, superblock_vote: SuperBlockVo chain_manager_addr.do_send(AddSuperBlockVote { superblock_vote }); } +fn session_send_protocols_info(session: &Session, protocol_versions: Vec) { + SessionsManager::from_registry().do_send(SendProtocolVersions { + address: session.remote_addr, + protocol_versions, + }) +} + #[cfg(test)] mod tests { use witnet_data_structures::chain::Hash; @@ -1167,7 +1279,15 @@ mod tests { let current_epoch = 0; assert_eq!( - check_beacon_compatibility(¤t_beacon, ¤t_beacon, current_epoch, None), + check_beacon_compatibility( + ¤t_beacon, + ¤t_beacon, + current_epoch, + 1_000_000_000, + 45, + None, + None + ), Ok(()) ); } @@ -1192,7 +1312,15 @@ mod tests { let current_epoch = 1; assert_eq!( - check_beacon_compatibility(¤t_beacon, ¤t_beacon, current_epoch, None), + check_beacon_compatibility( + ¤t_beacon, + ¤t_beacon, + current_epoch, + 1_000_000_000, + 45, + None, + None + ), Ok(()), ); } @@ -1229,11 +1357,27 @@ mod tests { // Both nodes can peer because they share the same superblock assert_eq!( - check_beacon_compatibility(¤t_beacon, &received_beacon, current_epoch, None), + check_beacon_compatibility( + ¤t_beacon, + &received_beacon, + current_epoch, + 1_000_000_000, + 45, + None, + None + ), Ok(()) ); assert_eq!( - check_beacon_compatibility(&received_beacon, ¤t_beacon, current_epoch, None), + check_beacon_compatibility( + &received_beacon, + ¤t_beacon, + current_epoch, + 1_000_000_000, + 45, + None, + None + ), Ok(()) ); } @@ -1273,7 +1417,15 @@ mod tests { // We cannot peer with the other node assert_eq!( - check_beacon_compatibility(¤t_beacon, &received_beacon, current_epoch, None), + check_beacon_compatibility( + ¤t_beacon, + &received_beacon, + current_epoch, + 1_000_000_000, + 45, + None, + None + ), Err(HandshakeError::PeerBeaconOld { current_beacon: current_beacon.highest_superblock_checkpoint, received_beacon: received_beacon.highest_superblock_checkpoint, @@ -1281,7 +1433,15 @@ mod tests { ); // But the other node can peer with us and start syncing assert_eq!( - check_beacon_compatibility(&received_beacon, ¤t_beacon, current_epoch, None), + check_beacon_compatibility( + &received_beacon, + ¤t_beacon, + current_epoch, + 1_000_000_000, + 45, + None, + None + ), Ok(()) ); } @@ -1318,7 +1478,15 @@ mod tests { // We cannot peer with the other node assert_eq!( - check_beacon_compatibility(¤t_beacon, &received_beacon, current_epoch, None), + check_beacon_compatibility( + ¤t_beacon, + &received_beacon, + current_epoch, + 1_000_000_000, + 45, + None, + None + ), Err(HandshakeError::PeerBeaconDifferentBlockHash { current_beacon: current_beacon.highest_superblock_checkpoint, received_beacon: received_beacon.highest_superblock_checkpoint, @@ -1326,7 +1494,15 @@ mod tests { ); // And the other node cannot peer with us assert_eq!( - check_beacon_compatibility(&received_beacon, ¤t_beacon, current_epoch, None), + check_beacon_compatibility( + &received_beacon, + ¤t_beacon, + current_epoch, + 1_000_000_000, + 45, + None, + None + ), Err(HandshakeError::PeerBeaconDifferentBlockHash { current_beacon: received_beacon.highest_superblock_checkpoint, received_beacon: current_beacon.highest_superblock_checkpoint, @@ -1442,4 +1618,47 @@ mod tests { received_ts ); } + + #[test] + fn test_epoch_recalculation() { + let wit2_protocol = ProtocolVersion { + version: ProtocolVersionName::V2_0(true), + activation_epoch: 40, + checkpoint_period: 20, + }; + + let checkpoints_period: u16 = 45; + + let time_since_genesis = i64::from(42 * checkpoints_period) + 16; + let recalculated_epoch = recalculate_epoch( + time_since_genesis, + checkpoints_period, + Some(wit2_protocol.clone()), + ); + assert_eq!(45, recalculated_epoch); + + let time_since_genesis = i64::from(42 * checkpoints_period) + 44; + let recalculated_epoch = recalculate_epoch( + time_since_genesis, + checkpoints_period, + Some(wit2_protocol.clone()), + ); + assert_eq!(46, recalculated_epoch); + + let time_since_genesis = i64::from(43 * checkpoints_period) + 14; + let recalculated_epoch = recalculate_epoch( + time_since_genesis, + checkpoints_period, + Some(wit2_protocol.clone()), + ); + assert_eq!(47, recalculated_epoch); + + let time_since_genesis = i64::from(43 * checkpoints_period) + 33; + let recalculated_epoch = recalculate_epoch( + time_since_genesis, + checkpoints_period, + Some(wit2_protocol.clone()), + ); + assert_eq!(48, recalculated_epoch); + } } diff --git a/node/src/actors/sessions_manager/beacons.rs b/node/src/actors/sessions_manager/beacons.rs index cd0bb03ff..0e26145bc 100644 --- a/node/src/actors/sessions_manager/beacons.rs +++ b/node/src/actors/sessions_manager/beacons.rs @@ -1,6 +1,24 @@ use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; -use witnet_data_structures::types::LastBeacon; +use witnet_data_structures::types::{LastBeacon, ProtocolVersion}; + +use failure::Fail; + +#[derive(Debug, Fail)] +pub enum NotSettingProtocolVersions { + #[fail(display = "Not setting protocol info because of lack of peers (still bootstrapping)")] + BootstrapNeeded, + #[fail( + display = "Not setting protocol info because not all peers sent matching protocol info vectors" + )] + MismatchingProtocolVersions, + #[fail(display = "Not setting protocol info because protocol info vector was empty")] + NoProtocolVersions, + #[fail( + display = "Not setting protocol info because not enough peers sent their protocol info vectors" + )] + NotEnoughBeacons, +} /// Stores the LastBeacons received from our peers, and also keeps track /// of the list of peers which have not sent us a beacon yet. @@ -16,6 +34,8 @@ pub struct Beacons { peers_not_beacon: HashSet, // Peers which have already sent us their beacon peers_with_beacon: HashMap, + // Peers which have already sent us their protocol info + peers_with_protocol_versions: HashMap>, } impl Beacons { @@ -29,10 +49,16 @@ impl Beacons { self.peers_with_beacon.len() } + /// Return number of peers which have sent us a beacon + pub fn peers_protocol_versions_received_count(&self) -> usize { + self.peers_with_protocol_versions.len() + } + /// Clear the existing lists of peers and start waiting for the new ones pub fn clear>(&mut self, peers: I) { self.peers_not_beacon.clear(); self.peers_with_beacon.clear(); + self.peers_with_protocol_versions.clear(); for socket_addr in peers { self.peers_not_beacon.insert(socket_addr); } @@ -57,11 +83,17 @@ impl Beacons { self.peers_with_beacon.insert(k, v); } + /// Insert protocol info vector. Overwrites already existing entries. + pub fn insert_protocol_versions(&mut self, k: SocketAddr, pi: Vec) { + self.peers_with_protocol_versions.insert(k, pi); + } + /// Remove beacon. Used when a peer disconnects before we reach consensus: /// we do not want to count that beacon pub fn remove(&mut self, k: &SocketAddr) { self.peers_not_beacon.remove(k); self.peers_with_beacon.remove(k); + self.peers_with_protocol_versions.remove(k); } /// When a new peer connects, we add it to the peers_not_beacon map, in order to @@ -84,6 +116,27 @@ impl Beacons { None } } + + pub fn check_protocol_versions( + &self, + ) -> Result, NotSettingProtocolVersions> { + let protocol_verions: Vec<_> = self.peers_with_protocol_versions.values().collect(); + + let mut protocols = protocol_verions.into_iter(); + let first_protocol = match protocols.next() { + Some(protocol) => protocol, + None => return Err(NotSettingProtocolVersions::NoProtocolVersions), + }; + if protocols.all(|protocol| protocol == first_protocol) { + log::debug!("All received protocol version vectors match"); + + Ok(first_protocol.to_vec()) + } else { + log::debug!("Received protocol version vectors do not match"); + + Err(NotSettingProtocolVersions::MismatchingProtocolVersions) + } + } } #[cfg(test)] diff --git a/node/src/actors/sessions_manager/handlers.rs b/node/src/actors/sessions_manager/handlers.rs index ebbd758a3..d949ccc38 100644 --- a/node/src/actors/sessions_manager/handlers.rs +++ b/node/src/actors/sessions_manager/handlers.rs @@ -18,9 +18,9 @@ use crate::actors::{ messages::{ AddConsolidatedPeer, AddPeers, Anycast, Broadcast, Consolidate, Create, DropAllPeers, DropOutboundPeers, EpochNotification, GetConsolidatedPeers, LogMessage, NumSessions, - NumSessionsResult, PeerBeacon, Register, RemoveAddressesFromTried, SessionsUnitResult, - SetEpochConstants, SetLastBeacon, SetPeersLimits, SetSuperBlockTargetBeacon, TryMineBlock, - Unregister, + NumSessionsResult, PeerBeacon, Register, RemoveAddressesFromTried, SendProtocolVersions, + SessionsUnitResult, SetEpochConstants, SetLastBeacon, SetPeersLimits, + SetSuperBlockTargetBeacon, TryMineBlock, Unregister, }, peers_manager::PeersManager, session::Session, @@ -463,6 +463,21 @@ impl Handler for SessionsManager { } } +impl Handler for SessionsManager { + type Result = (); + + fn handle(&mut self, msg: SendProtocolVersions, _ctx: &mut Context) { + self.beacons + .insert_protocol_versions(msg.address, msg.protocol_versions); + + // Check if we have all the beacons, and sent PeersBeacons message to ChainManager + match self.try_set_protocol_versions() { + Ok(()) => {} + Err(e) => log::debug!("{}", e), + } + } +} + impl Handler for SessionsManager { type Result = ::Result; diff --git a/node/src/actors/sessions_manager/mod.rs b/node/src/actors/sessions_manager/mod.rs index 114ca7054..8ba4f8118 100644 --- a/node/src/actors/sessions_manager/mod.rs +++ b/node/src/actors/sessions_manager/mod.rs @@ -17,10 +17,11 @@ use crate::{ epoch_manager::EpochManager, messages::{ Anycast, CloseSession, GetEpochConstants, GetRandomPeers, OutboundTcpConnect, - PeersBeacons, PeersSocketAddrsResult, SendGetPeers, Subscribe, + PeersBeacons, PeersSocketAddrsResult, SendGetPeers, SetEpochConstants, Subscribe, }, peers_manager::PeersManager, session::Session, + sessions_manager::beacons::NotSettingProtocolVersions, }, utils::stop_system_if_panicking, }; @@ -28,7 +29,10 @@ use failure::Fail; use witnet_config::config::Config; use witnet_data_structures::{ chain::{CheckpointBeacon, Epoch, EpochConstants}, - types::LastBeacon, + get_protocol_version_activation_epoch, get_protocol_version_period, + proto::versioning::ProtocolVersion, + register_protocol_version, + types::{LastBeacon, ProtocolVersionName}, }; mod actor; @@ -230,6 +234,71 @@ impl SessionsManager { Ok(()) } + fn try_set_protocol_versions(&mut self) -> Result<(), NotSettingProtocolVersions> { + // Do not send PeersBeacons until we get to the outbound limit + if self.sessions.is_outbound_bootstrap_needed() { + return Err(NotSettingProtocolVersions::BootstrapNeeded); + } + + // We may have 0 beacons out of 0 + // We actually want to check it against the outbound limit + let expected_peers = self + .sessions + .outbound_consolidated + .limit + .map(|x| x as usize); + if Some(self.beacons.peers_protocol_versions_received_count()) < expected_peers { + return Err(NotSettingProtocolVersions::NotEnoughBeacons); + } + + // Check all peers have sent us the same protocol info vector + match self.beacons.check_protocol_versions() { + Ok(protocols) => { + let mut wit2_registered = false; + for protocol in protocols { + register_protocol_version( + protocol.version.into(), + protocol.activation_epoch, + protocol.checkpoint_period, + ); + if protocol.version == ProtocolVersionName::V2_0(true) { + wit2_registered = true; + } + } + + if wit2_registered { + log::debug!("Updating epoch constants"); + + let config = self.config.as_ref().expect("Config should be set"); + let checkpoint_zero_timestamp_wit2 = config + .consensus_constants + .checkpoint_zero_timestamp + + i64::from(get_protocol_version_activation_epoch(ProtocolVersion::V2_0)) + * i64::from(config.consensus_constants.checkpoints_period); + + let epoch_constants = EpochConstants { + checkpoint_zero_timestamp: config + .consensus_constants + .checkpoint_zero_timestamp, + checkpoints_period: config.consensus_constants.checkpoints_period, + checkpoint_zero_timestamp_wit2, + checkpoints_period_wit2: get_protocol_version_period(ProtocolVersion::V2_0), + }; + self.epoch_constants = Some(epoch_constants); + + log::debug!("Sending epoch constants to chain manager"); + ChainManager::from_registry().do_send(SetEpochConstants { epoch_constants }); + + log::debug!("Sending epoch constants to epoch manager"); + EpochManager::from_registry().do_send(SetEpochConstants { epoch_constants }); + } + } + Err(e) => return Err(e), + } + + Ok(()) + } + /// Send PeersBeacons message to peers manager fn send_peers_beacons(&mut self, ctx: &mut Context) { let (pb, pnb) = match self.beacons.send() { diff --git a/schemas/witnet/witnet.proto b/schemas/witnet/witnet.proto index fc0d7c044..8889d2107 100644 --- a/schemas/witnet/witnet.proto +++ b/schemas/witnet/witnet.proto @@ -46,6 +46,20 @@ message Message { Command kind = 2; } +message ProtocolVersionName { + oneof kind { + bool V1_7 = 1; + bool V1_8 = 2; + bool V2_0 = 3; + } +} + +message ProtocolVersion { + ProtocolVersionName version = 1; + uint32 activation_epoch = 2; + uint32 checkpoint_period = 3; +} + message Version { uint32 version = 1; int64 timestamp = 2; @@ -55,6 +69,7 @@ message Version { string user_agent = 6; fixed64 nonce = 7; LastBeacon beacon = 8; + repeated ProtocolVersion protocol_versions = 9; } message Verack {