Skip to content

Commit

Permalink
add test and api
Browse files Browse the repository at this point in the history
  • Loading branch information
jolestar committed Aug 24, 2021
1 parent 95a7565 commit a43acb4
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 21 deletions.
2 changes: 2 additions & 0 deletions cmd/starcoin/src/node/reset_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use scmd::{CommandAction, ExecContext};
use starcoin_crypto::HashValue;
use structopt::StructOpt;

/// Reset the node chain to block of `block-hash`, and clean all blocks after the block.
/// Note: this command may broken the block database in node.
#[derive(Debug, StructOpt)]
#[structopt(name = "reset")]
pub struct ResetOpt {
Expand Down
2 changes: 2 additions & 0 deletions node/api/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use futures::channel::oneshot::Receiver;
use starcoin_crypto::HashValue;
use starcoin_service_registry::{ServiceInfo, ServiceRequest, ServiceStatus};

Expand All @@ -22,6 +23,7 @@ pub enum NodeRequest {
pub enum NodeResponse {
Services(Vec<ServiceInfo>),
Result(Result<()>),
AsyncResult(Receiver<Result<()>>),
ServiceStatus(ServiceStatus),
}

Expand Down
5 changes: 4 additions & 1 deletion node/api/src/node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ where
Ok(())
}
async fn reset_node(&self, block_id: HashValue) -> Result<()> {
self.try_send(NodeRequest::ResetNode(block_id))?;
let response = self.send(NodeRequest::ResetNode(block_id)).await??;
if let NodeResponse::AsyncResult(receiver) = response {
return receiver.await?;
}
Ok(())
}
async fn delete_block(&self, block_id: HashValue) -> Result<()> {
Expand Down
19 changes: 10 additions & 9 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ use starcoin_storage::{BlockStore, Storage};
use starcoin_stratum::service::{StratumService, StratumServiceFactory};
use starcoin_stratum::stratum::{Stratum, StratumFactory};
use starcoin_sync::announcement::AnnouncementService;
use starcoin_sync::block_connector::BlockConnectorService;
use starcoin_sync::block_connector::{BlockConnectorService, ResetRequest};
use starcoin_sync::sync::SyncService;
use starcoin_sync::txn_sync::TxnSyncService;
use starcoin_txpool::TxPoolActorService;
use starcoin_types::startup_info::StartupInfo;
use starcoin_types::system_events::SystemStarted;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -72,7 +71,7 @@ impl ServiceHandler<Self, NodeRequest> for NodeService {
fn handle(
&mut self,
msg: NodeRequest,
_ctx: &mut ServiceContext<NodeService>,
ctx: &mut ServiceContext<NodeService>,
) -> Result<NodeResponse> {
Ok(match msg {
NodeRequest::ListService => NodeResponse::Services(self.registry.list_service_sync()?),
Expand Down Expand Up @@ -119,12 +118,14 @@ impl ServiceHandler<Self, NodeRequest> for NodeService {
.start_service_sync(GenerateBlockEventPacemaker::service_name()),
),
NodeRequest::ResetNode(block_hash) => {
let storage = self
.registry
.get_shared_sync::<Arc<Storage>>()
.expect("Storage must exist.");
info!("Prepare to reset node startup info to {}", block_hash);
NodeResponse::Result(storage.save_startup_info(StartupInfo { main: block_hash }))
let connect_service = ctx.service_ref::<BlockConnectorService>()?.clone();
let fut = async move {
info!("Prepare to reset node startup info to {}", block_hash);
let result = connect_service.send(ResetRequest { block_hash }).await?;
result
};
let receiver = ctx.exec(fut);
NodeResponse::AsyncResult(receiver)
}
NodeRequest::DeleteBlock(block_hash) => {
let storage = self
Expand Down
2 changes: 1 addition & 1 deletion rpc/api/src/node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub trait NodeManagerApi {
#[rpc(name = "node_manager.shutdown_system")]
fn shutdown_system(&self) -> FutureResult<()>;
#[rpc(name = "node_manager.reset_to_block")]
fn reset_to_block(&self, block_number: HashValue) -> FutureResult<()>;
fn reset_to_block(&self, block_hash: HashValue) -> FutureResult<()>;

// /// Delete block data in [start_number, end_number)
// #[rpc(name = "node_manager.delete_block_range")]
Expand Down
16 changes: 14 additions & 2 deletions sync/src/block_connector/block_connector_service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::block_connector::WriteBlockChainService;
use crate::block_connector::{ResetRequest, WriteBlockChainService};
use crate::sync::{CheckSyncEvent, SyncService};
use crate::tasks::BlockConnectedEvent;
use anyhow::{format_err, Result};
Expand All @@ -10,7 +10,9 @@ use logger::prelude::*;
use network::NetworkServiceRef;
use network_api::PeerProvider;
use starcoin_chain_api::{ConnectBlockError, WriteableChainService};
use starcoin_service_registry::{ActorService, EventHandler, ServiceContext, ServiceFactory};
use starcoin_service_registry::{
ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler,
};
use starcoin_storage::{BlockStore, Storage};
use starcoin_sync_api::PeerNewBlock;
use starcoin_types::sync_status::SyncStatus;
Expand Down Expand Up @@ -164,3 +166,13 @@ impl EventHandler<Self, PeerNewBlock> for BlockConnectorService {
}
}
}

impl ServiceHandler<Self, ResetRequest> for BlockConnectorService {
fn handle(
&mut self,
msg: ResetRequest,
_ctx: &mut ServiceContext<BlockConnectorService>,
) -> Result<()> {
self.chain_service.reset(msg.block_hash)
}
}
12 changes: 12 additions & 0 deletions sync/src/block_connector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0

use starcoin_crypto::HashValue;
use starcoin_service_registry::ServiceRequest;

mod block_connector_service;
mod metrics;
#[cfg(test)]
Expand All @@ -18,3 +21,12 @@ pub use test_write_block_chain::create_writeable_block_chain;
pub use test_write_block_chain::gen_blocks;
#[cfg(test)]
pub use test_write_block_chain::new_block;

#[derive(Debug, Clone)]
pub struct ResetRequest {
pub block_hash: HashValue,
}

impl ServiceRequest for ResetRequest {
type Response = anyhow::Result<()>;
}
37 changes: 37 additions & 0 deletions sync/src/block_connector/test_write_block_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,40 @@ async fn test_block_chain_switch_main() {
2 * times
);
}

#[stest::test]
async fn test_block_chain_reset() -> anyhow::Result<()> {
let times = 10;
let (mut writeable_block_chain_service, node_config, _) = create_writeable_block_chain().await;
let net = node_config.net();
gen_blocks(
times,
&mut writeable_block_chain_service,
net.time_service().as_ref(),
);
assert_eq!(
writeable_block_chain_service
.get_main()
.current_header()
.number(),
times
);
let block = writeable_block_chain_service
.get_main()
.get_block_by_number(3)?
.unwrap();
writeable_block_chain_service.reset(block.id())?;
assert_eq!(
writeable_block_chain_service
.get_main()
.current_header()
.number(),
3
);

assert!(writeable_block_chain_service
.get_main()
.get_block_by_number(2)?
.is_some());
Ok(())
}
16 changes: 8 additions & 8 deletions sync/src/block_connector/write_block_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,20 @@ where

/// Reset the node to `block_id`, and replay blocks after the block
pub fn reset(&mut self, block_id: HashValue) -> Result<()> {
let new_head_block = self
.main
.get_block(block_id)?
.ok_or_else(|| format_err!("Can not find block {} in main chain", block_id,))?;
let new_branch = BlockChain::new(
self.config.net().time_service(),
block_id,
self.storage.clone(),
)?;
let ancestor = self.main.find_ancestor(&new_branch)?.ok_or_else(|| {
format_err!(
"Can not find ancestors between main chain: {:?} and branch: {:?}",
self.main.status(),
new_branch.status()
)
})?;

// delete block since from block.number + 1 to latest.
let start = new_head_block.header().number().saturating_add(1);
let latest = self.main.status().head.number();
for block_number in ancestor.number..latest {
for block_number in start..latest {
if let Some(block) = self.main.get_block_by_number(block_number)? {
info!("Delete block({:?})", block.header);
self.storage.delete_block(block.id())?;
Expand Down

0 comments on commit a43acb4

Please sign in to comment.