Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc&&miner: Update miner && related apis #2534

Merged
merged 1 commit into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 3 additions & 16 deletions cmd/miner_client/src/job_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl JobRpcClient {
nonce,
hex::encode(extra.to_vec()),
) {
error!("Submit seal error: {}", e);
warn!("Submit seal error: {}", e);
Delay::new(Duration::from_secs(1)).await;
}
}
Expand All @@ -58,23 +58,10 @@ impl JobRpcClient {
let mut stream = stream.into_stream();
while let Some(item) = stream.next().await {
match item {
Ok(b) => {
let blob = match hex::decode(b.minting_blob) {
Ok(blob) => blob,
Err(e) => {
error!("Invalid blob:{}", e);
continue;
}
};
let event = MintBlockEvent::new(
b.strategy,
blob,
b.difficulty,
b.block_number,
);
Ok(event) => {
info!(
"Receive mint event, minting_blob: {}, difficulty: {}",
hex::encode(event.minting_blob.as_slice()),
hex::encode(&event.minting_blob),
event.difficulty
);
let _ = sender.unbounded_send(event);
Expand Down
35 changes: 18 additions & 17 deletions miner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,43 @@ edition = "2018"
anyhow = "1.0.40"
futures = "0.3.12"
futures-timer = "3.0"
hex = { version = "0.4.3", default-features = false }
hex = { version = "0.4.3" }
thiserror = "1.0"
once_cell = "1.7.2"
parking_lot = "0.11.1"

starcoin-config = { path = "../config" }
starcoin-chain = {path = "../chain"}
types = {path = "../types", package="starcoin-types" }
starcoin-state-api = {path = "../state/api"}
statedb = {path = "../state/statedb", package="starcoin-statedb" }
state_tree = {path = "../state/state-tree", package="starcoin-state-tree" }
consensus = {path = "../consensus", package="starcoin-consensus" }
starcoin-chain = { path = "../chain" }
types = { path = "../types", package = "starcoin-types" }
starcoin-state-api = { path = "../state/api" }
statedb = { path = "../state/statedb", package = "starcoin-statedb" }
state_tree = { path = "../state/state-tree", package = "starcoin-state-tree" }
consensus = { path = "../consensus", package = "starcoin-consensus" }
starcoin-storage = { path = "../storage" }
executor = { path = "../executor", package = "starcoin-executor" }
starcoin-txpool = { path = "../txpool" }
starcoin-txpool-api = { path = "../txpool/api" }
tokio = { version = "0.2", features = ["full"] }
logger = {path = "../commons/logger", package="starcoin-logger"}
crypto = { package="starcoin-crypto", path = "../commons/crypto"}
starcoin-accumulator = {path = "../commons/accumulator", package="starcoin-accumulator"}
logger = { path = "../commons/logger", package = "starcoin-logger" }
crypto = { package = "starcoin-crypto", path = "../commons/crypto" }
starcoin-accumulator = { path = "../commons/accumulator", package = "starcoin-accumulator" }
starcoin-account-api = { path = "../account/api" }
starcoin-account-service = { path = "../account/service" }
starcoin-metrics = { path = "../commons/metrics" }
starcoin-miner-client = { path = "../cmd/miner_client" }
bcs-ext = { package="bcs-ext", path = "../commons/bcs_ext" }
bcs-ext = { package = "bcs-ext", path = "../commons/bcs_ext" }
starcoin-vm-types = { path = "../vm/types" }
starcoin-open-block = { path = "../chain/open-block" }
starcoin-service-registry = { path = "../commons/service-registry" }
serde = "1.0.126"

[dev-dependencies]
sync = {path = "../sync", package="starcoin-sync" }
starcoin-genesis = {path = "../genesis"}
starcoin-sync-api = {package="starcoin-sync-api", path="../sync/api"}
sync = { path = "../sync", package = "starcoin-sync" }
starcoin-genesis = { path = "../genesis" }
starcoin-sync-api = { package = "starcoin-sync-api", path = "../sync/api" }
stest = { path = "../commons/stest" }
network-rpc = {path="../network-rpc", package = "starcoin-network-rpc"}
starcoin-network-rpc-api = {path="../network-rpc/api"}
starcoin-state-service = {path="../state/service"}
network-rpc = { path = "../network-rpc", package = "starcoin-network-rpc" }
starcoin-network-rpc-api = { path = "../network-rpc/api" }
starcoin-state-service = { path = "../state/service" }
starcoin-node = { path = "../node" }
test-helper = { path = "../test-helper" }
21 changes: 15 additions & 6 deletions miner/src/job_bus_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{BlockHeaderExtra, MintBlockEvent, SubmitSealEvent};
use crate::{BlockHeaderExtra, MinerService, MintBlockEvent, SubmitSealRequest};
use anyhow::Result;
use futures::executor::block_on;
use futures::stream::BoxStream;
Expand All @@ -13,11 +13,20 @@ use std::sync::Arc;
pub struct JobBusClient {
bus: ServiceRef<BusService>,
time_service: Arc<dyn TimeService>,
miner_service: ServiceRef<MinerService>,
}

impl JobBusClient {
pub fn new(bus: ServiceRef<BusService>, time_service: Arc<dyn TimeService>) -> Self {
Self { bus, time_service }
pub fn new(
miner_service: ServiceRef<MinerService>,
bus: ServiceRef<BusService>,
time_service: Arc<dyn TimeService>,
) -> Self {
Self {
bus,
time_service,
miner_service,
}
}
}

Expand All @@ -33,9 +42,9 @@ impl JobClient for JobBusClient {
nonce: u32,
extra: BlockHeaderExtra,
) -> Result<()> {
self.bus
.broadcast(SubmitSealEvent::new(minting_blob, nonce, extra))?;
Ok(())
self.miner_service
.try_send(SubmitSealRequest::new(minting_blob, nonce, extra))
.map_err(|e| e.into())
}

fn time_service(&self) -> Arc<dyn TimeService> {
Expand Down
149 changes: 91 additions & 58 deletions miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,28 @@ mod metrics;
pub mod task;

pub use create_block_template::{CreateBlockTemplateRequest, CreateBlockTemplateService};
use crypto::HashValue;
pub use starcoin_miner_client::miner::{MinerClient, MinerClientService};
use std::fmt;
use thiserror::Error;
pub use types::block::BlockHeaderExtra;
pub use types::system_events::{GenerateBlockEvent, MinedBlock, MintBlockEvent, SubmitSealEvent};
pub use types::system_events::{GenerateBlockEvent, MinedBlock, MintBlockEvent};

#[derive(Debug, Error)]
pub enum MinerError {
#[error("Mint task is empty Error")]
TaskEmptyError,
#[error("Mint task is mismatch Error, current blob: {current}, got blob: {real}")]
TaskMisMatchError { current: String, real: String },
}

#[derive(Debug)]
pub enum MinerClientSubscribeRequest {
Add(u32),
Remove(u32),
pub struct UpdateSubscriberNumRequest {
pub number: Option<u32>,
}

impl ServiceRequest for MinerClientSubscribeRequest {
type Response = Result<Option<MintBlockEvent>>;
impl ServiceRequest for UpdateSubscriberNumRequest {
type Response = Option<MintBlockEvent>;
}

pub struct MinerService {
Expand All @@ -43,27 +53,55 @@ pub struct MinerService {
client_subscribers_num: u32,
}

impl ServiceHandler<Self, MinerClientSubscribeRequest> for MinerService {
impl ServiceRequest for SubmitSealRequest {
type Response = Result<HashValue>;
}

#[derive(Clone, Debug)]
pub struct SubmitSealRequest {
pub nonce: u32,
pub extra: BlockHeaderExtra,
pub minting_blob: Vec<u8>,
}

impl fmt::Display for SubmitSealRequest {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"Seal{{nonce:{}, extra:{}, blob:{}}}",
self.nonce,
self.extra,
hex::encode(&self.minting_blob)
)
}
}

impl SubmitSealRequest {
pub fn new(minting_blob: Vec<u8>, nonce: u32, extra: BlockHeaderExtra) -> Self {
Self {
minting_blob,
nonce,
extra,
}
}
}

impl ServiceHandler<Self, UpdateSubscriberNumRequest> for MinerService {
fn handle(
&mut self,
msg: MinerClientSubscribeRequest,
req: UpdateSubscriberNumRequest,
_ctx: &mut ServiceContext<MinerService>,
) -> Result<Option<MintBlockEvent>> {
match msg {
MinerClientSubscribeRequest::Add(num) => {
self.client_subscribers_num = num;
Ok(self.current_task.as_ref().map(|task| MintBlockEvent {
strategy: task.block_template.strategy,
minting_blob: task.minting_blob.clone(),
difficulty: task.block_template.difficulty,
block_number: task.block_template.number,
}))
}
MinerClientSubscribeRequest::Remove(num) => {
self.client_subscribers_num = num;
Ok(None)
}
) -> Option<MintBlockEvent> {
if let Some(num) = req.number {
self.client_subscribers_num = num;
}
self.current_task.as_ref().map(|task| MintBlockEvent {
parent_hash: task.block_template.parent_hash,
strategy: task.block_template.strategy,
minting_blob: task.minting_blob.clone(),
difficulty: task.block_template.difficulty,
block_number: task.block_template.number,
})
}
}

Expand All @@ -84,23 +122,26 @@ impl ServiceFactory<MinerService> for MinerService {
impl ActorService for MinerService {
fn started(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
ctx.subscribe::<GenerateBlockEvent>();
ctx.subscribe::<SubmitSealEvent>();
Ok(())
}

fn stopped(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
ctx.unsubscribe::<GenerateBlockEvent>();
ctx.unsubscribe::<SubmitSealEvent>();
Ok(())
}
}

impl EventHandler<Self, SubmitSealEvent> for MinerService {
fn handle_event(&mut self, event: SubmitSealEvent, ctx: &mut ServiceContext<MinerService>) {
if let Err(e) = self.finish_task(event.nonce, event.extra, event.minting_blob.clone(), ctx)
{
error!("Process SubmitSealEvent {:?} fail: {:?}", event, e);
}
impl ServiceHandler<Self, SubmitSealRequest> for MinerService {
fn handle(
&mut self,
req: SubmitSealRequest,
ctx: &mut ServiceContext<MinerService>,
) -> Result<HashValue> {
self.finish_task(req.nonce, req.extra, req.minting_blob.clone(), ctx)
.map_err(|e| {
warn!(target: "miner", "process seal: {} failed: {}", req, e);
e
})
}
}

Expand All @@ -122,6 +163,7 @@ impl MinerService {
let difficulty = block_template.difficulty;
let strategy = block_template.strategy;
let number = block_template.number;
let parent_hash = block_template.parent_hash;
let task = MintTask::new(block_template);
let mining_blob = task.minting_blob.clone();
if let Some(current_task) = self.current_task.as_ref() {
Expand All @@ -132,6 +174,7 @@ impl MinerService {
}
self.current_task = Some(task);
ctx.broadcast(MintBlockEvent::new(
parent_hash,
strategy,
mining_blob,
difficulty,
Expand All @@ -147,48 +190,38 @@ impl MinerService {
extra: BlockHeaderExtra,
minting_blob: Vec<u8>,
ctx: &mut ServiceContext<MinerService>,
) -> Result<()> {
) -> Result<HashValue> {
match self.current_task.as_ref() {
None => {
debug!(
"MintTask is none, but got nonce: {}, extra:{:?} for minting_blob: {:?}, may be mint by other client.",
nonce, extra, minting_blob,
);
return Ok(());
}
Some(task) => {
if task.minting_blob != minting_blob {
info!(
"[miner] Jobs hash mismatch expect: {}, got: {}, probably received old job result.",
hex::encode(task.minting_blob.as_slice()),
hex::encode(minting_blob.as_slice())
);
return Ok(());
}
if let Err(e) = task.block_template.strategy.verify_blob(
return Err(MinerError::TaskMisMatchError {
current: hex::encode(&task.minting_blob),
real: hex::encode(minting_blob),
}
.into());
};
task.block_template.strategy.verify_blob(
task.minting_blob.clone(),
nonce,
extra,
task.block_template.difficulty,
) {
warn!(
"Failed to verify blob: {}, nonce: {}, err: {}",
hex::encode(task.minting_blob.as_slice()),
nonce,
e
);
return Ok(());
}
)?
}
None => {
return Err(MinerError::TaskEmptyError.into());
}
}

if let Some(task) = self.current_task.take() {
let block = task.finish(nonce, extra);
info!("Mint new block: {}", block);
let block_hash = block.id();
info!(target: "miner", "Mint new block: {}", block);
ctx.broadcast(MinedBlock(Arc::new(block)));
MINER_METRICS.block_mint_count.inc();
Ok(block_hash)
} else {
Err(MinerError::TaskEmptyError.into())
}
Ok(())
}

pub fn is_minting(&self) -> bool {
Expand Down
Loading