Skip to content

Commit

Permalink
Fix mint memory leak issue (#3351)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanlee42 authored Apr 19, 2022
1 parent 0cab541 commit 8a741cb
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 77 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cmd/miner_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ starcoin-miner = { path = "../../miner" }
starcoin-miner-client-api = { path = "./api" }
stest = { path = "../../commons/stest" }
clap = { version = "3", features = ["derive"] }
async-trait = "0.1.53"

[[bin]]
name = "starcoin_miner"
path = "src/main.rs"
Expand Down
1 change: 1 addition & 0 deletions cmd/miner_client/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ starcoin-types = { path = "../../../types" }
dyn-clone = "1.0.5"
futures = "0.3.12"
anyhow = "1.0.41"
async-trait = "0.1.53"
17 changes: 10 additions & 7 deletions cmd/miner_client/src/job_bus_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{BlockHeaderExtra, JobClient, MintBlockEvent, SealEvent};
use anyhow::Result;
use futures::executor::block_on;
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use starcoin_miner::{MinerService, SubmitSealRequest};
Expand Down Expand Up @@ -30,20 +30,23 @@ impl JobBusClient {
}
}

#[async_trait]
impl JobClient for JobBusClient {
fn subscribe(&self) -> Result<BoxStream<'static, MintBlockEvent>> {
async fn subscribe(&self) -> Result<BoxStream<'static, MintBlockEvent>> {
let bus = self.bus.clone();
block_on(async move { bus.channel::<MintBlockEvent>().await.map(|s| s.boxed()) })
bus.channel::<MintBlockEvent>().await.map(|s| s.boxed())
}

fn submit_seal(&self, seal: SealEvent) -> Result<()> {
async fn submit_seal(&self, seal: SealEvent) -> Result<()> {
let extra = match &seal.extra {
None => BlockHeaderExtra::default(),
Some(extra) => extra.extra,
};
self.miner_service
.try_send(SubmitSealRequest::new(seal.minting_blob, seal.nonce, extra))
.map_err(|e| e.into())
let _ = self
.miner_service
.send(SubmitSealRequest::new(seal.minting_blob, seal.nonce, extra))
.await??;
Ok(())
}

fn time_service(&self) -> Arc<dyn TimeService> {
Expand Down
7 changes: 4 additions & 3 deletions cmd/miner_client/src/job_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::{JobClient, SealEvent};
use anyhow::Result;
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{stream::StreamExt, Future, TryStreamExt};
use futures_channel::mpsc::{unbounded, UnboundedSender};
Expand All @@ -14,7 +15,6 @@ use starcoin_types::block::BlockHeaderExtra;
use starcoin_types::system_events::MintBlockEvent;
use std::sync::Arc;
use std::time::Duration;

#[derive(Clone)]
pub struct JobRpcClient {
rpc_client: Arc<RpcClient>,
Expand Down Expand Up @@ -96,12 +96,13 @@ impl JobRpcClient {
}
}

#[async_trait]
impl JobClient for JobRpcClient {
fn subscribe(&self) -> Result<BoxStream<'static, MintBlockEvent>> {
async fn subscribe(&self) -> Result<BoxStream<'static, MintBlockEvent>> {
Ok(self.forward_mint_block_stream())
}

fn submit_seal(&self, seal: SealEvent) -> Result<()> {
async fn submit_seal(&self, seal: SealEvent) -> Result<()> {
let extra = match &seal.extra {
None => BlockHeaderExtra::default(),
Some(extra) => extra.extra,
Expand Down
8 changes: 4 additions & 4 deletions cmd/miner_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub mod miner;
mod solver;
pub mod stratum_client;
pub mod stratum_client_service;

use anyhow::Result;
use async_trait::async_trait;
use futures::stream::BoxStream;
use starcoin_config::TimeService;
use starcoin_types::system_events::SealEvent;
Expand All @@ -20,9 +20,9 @@ pub use starcoin_types::{
};
use std::sync::Arc;

#[async_trait]
pub trait JobClient: Send + Unpin + Sync + Clone {
fn subscribe(&self) -> Result<BoxStream<'static, MintBlockEvent>>;

fn submit_seal(&self, seal: SealEvent) -> Result<()>;
async fn subscribe(&self) -> Result<BoxStream<'static, MintBlockEvent>>;
async fn submit_seal(&self, seal: SealEvent) -> Result<()>;
fn time_service(&self) -> Arc<dyn TimeService>;
}
2 changes: 1 addition & 1 deletion cmd/miner_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn main() {
let system = System::with_tokio_rt(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.on_thread_stop(|| println!("starcoin-miner thread stopped"))
.on_thread_stop(|| info!("starcoin-miner thread stopped"))
.thread_name("starcoin-miner")
.build()
.expect("failed to create tokio runtime for starcoin-miner")
Expand Down
71 changes: 35 additions & 36 deletions cmd/miner_client/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,37 +36,6 @@ impl<C: JobClient> MinerClient<C> {
current_task: None,
})
}
fn submit_seal(&self, seal: SealEvent) {
if let Err(err) = self.job_client.submit_seal(seal) {
error!("Submit seal to failed: {}", err);
return;
}
{
*self.num_seals_found.lock() += 1;
let msg = format!(
"Miner client Total seals found: {:>3}",
*self.num_seals_found.lock()
);
info!("{}", msg)
}
}

fn start_mint_work(&mut self, event: MintBlockEvent) {
let (stop_tx, stop_rx) = unbounded();
if let Some(mut task) = self.current_task.take() {
if let Err(e) = block_on(task.send(true)) {
debug!(
"Failed to send stop event to current task, may be finished:{:?}",
e
);
};
}
self.current_task = Some(stop_tx);
let nonce_tx = self.nonce_tx.clone();
let mut solver = dyn_clone::clone_box(&*self.solver);
//this will block on handle Sealevent if use actix spawn
thread::spawn(move || solver.solve(event, nonce_tx, stop_rx));
}
}

pub struct MinerClientService<C: JobClient> {
Expand All @@ -75,7 +44,8 @@ pub struct MinerClientService<C: JobClient> {

impl<C: JobClient> ActorService for MinerClientService<C> {
fn started(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
let jobs = self.inner.job_client.subscribe()?;
let job_client = self.inner.job_client.clone();
let jobs = block_on(job_client.subscribe())?;
ctx.add_stream(jobs);
let seals = self
.inner
Expand All @@ -101,14 +71,43 @@ impl<C: JobClient> EventHandler<Self, MintBlockEvent> for MinerClientService<C>
fn handle_event(
&mut self,
event: MintBlockEvent,
_ctx: &mut ServiceContext<MinerClientService<C>>,
ctx: &mut ServiceContext<MinerClientService<C>>,
) {
self.inner.start_mint_work(event);
let (stop_tx, stop_rx) = unbounded();
if let Some(mut task) = self.inner.current_task.take() {
ctx.wait(async move {
if let Err(e) = task.send(true).await {
error!(
"Failed to send stop event to current task, may be finished:{:?}",
e
);
}
});
}
self.inner.current_task = Some(stop_tx);
let nonce_tx = self.inner.nonce_tx.clone();
let mut solver = dyn_clone::clone_box(&*self.inner.solver);
//this will block on handle Sealevent if use ctx spawn
thread::spawn(move || solver.solve(event, nonce_tx, stop_rx));
}
}

impl<C: JobClient> EventHandler<Self, SealEvent> for MinerClientService<C> {
fn handle_event(&mut self, event: SealEvent, _ctx: &mut ServiceContext<MinerClientService<C>>) {
self.inner.submit_seal(event)
fn handle_event(&mut self, event: SealEvent, ctx: &mut ServiceContext<MinerClientService<C>>) {
{
*self.inner.num_seals_found.lock() += 1;
let msg = format!(
"Miner client Total seals found: {:>3}",
*self.inner.num_seals_found.lock()
);
info!("{}", msg)
}
let job_client = self.inner.job_client.clone();
let fut = async move {
if let Err(err) = job_client.submit_seal(event).await {
error!("Submit seal to failed: {}", err);
}
};
ctx.spawn(fut);
}
}
45 changes: 21 additions & 24 deletions cmd/miner_client/src/stratum_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use crate::stratum_client_service::{ShareRequest, StratumClientService, SubmitSe
use crate::{JobClient, SealEvent};
use anyhow::Result;
use async_std::sync::Arc;
use async_trait::async_trait;
use byteorder::{LittleEndian, WriteBytesExt};
use futures::executor::block_on;
use futures::future;
use futures::stream::{BoxStream, StreamExt};
use logger::prelude::error;
use starcoin_service_registry::ServiceRef;
use starcoin_stratum::rpc::LoginRequest;
use starcoin_stratum::target_hex_to_difficulty;
Expand Down Expand Up @@ -34,8 +35,9 @@ impl StratumJobClient {
}
}

#[async_trait]
impl JobClient for StratumJobClient {
fn subscribe(&self) -> Result<BoxStream<'static, MintBlockEvent>> {
async fn subscribe(&self) -> Result<BoxStream<'static, MintBlockEvent>> {
let srv = self.stratum_cli_srv.clone();
let login = self.login.clone();
let fut = async move {
Expand All @@ -59,7 +61,6 @@ impl JobClient for StratumJobClient {
extra: Some(MintEventExtra {
worker_id: job.id,
job_id: job.job_id,

extra,
}),
})
Expand All @@ -72,31 +73,27 @@ impl JobClient for StratumJobClient {
})?;
Ok::<BoxStream<MintBlockEvent>, anyhow::Error>(stream.boxed())
};
block_on(fut)
fut.await
}

#[allow(clippy::unit_arg)]
fn submit_seal(&self, seal: SealEvent) -> Result<()> {
async fn submit_seal(&self, seal: SealEvent) -> Result<()> {
let srv = self.stratum_cli_srv.clone();
let fut = async move {
let mut n = Vec::new();
n.write_u32::<LittleEndian>(seal.nonce)?;
let nonce = hex::encode(n);
let mint_extra = seal
.extra
.ok_or_else(|| anyhow::anyhow!("submit missing field"))?;
let r = srv
.send(SubmitSealRequest(ShareRequest {
id: mint_extra.worker_id,
job_id: mint_extra.job_id,
nonce,
result: seal.hash_result,
}))
.await?;
Ok::<(), anyhow::Error>(r)
};

block_on(fut)
let mut n = Vec::new();
n.write_u32::<LittleEndian>(seal.nonce)?;
let nonce = hex::encode(n);
let mint_extra = seal
.extra
.ok_or_else(|| anyhow::anyhow!("submit missing field"))?;
if let Err(e) = srv.try_send(SubmitSealRequest(ShareRequest {
id: mint_extra.worker_id,
job_id: mint_extra.job_id,
nonce,
result: seal.hash_result,
})) {
error!("failed to submit seal request {:?}", e);
}
Ok(())
}

fn time_service(&self) -> Arc<dyn TimeService> {
Expand Down
2 changes: 1 addition & 1 deletion cmd/miner_client/src/stratum_client_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ impl Inner {
if let Err(err) = self.process_output(resp).await{
debug!("process output error:{:?}", err);
}

},
}
}
Expand Down Expand Up @@ -299,6 +298,7 @@ impl ServiceHandler<StratumClientService, SubmitSealRequest> for StratumClientSe
msg: SubmitSealRequest,
_ctx: &mut ServiceContext<StratumClientService>,
) -> <SubmitSealRequest as ServiceRequest>::Response {
//FIXME: Failed to receive this msg since upgrade actix to 0.13.
if let Some(sender) = self.sender.clone().take() {
if let Err(e) = sender.unbounded_send(Request::SubmitSealRequest(msg)) {
error!("stratum handle submit seal request failed:{}", e);
Expand Down
3 changes: 2 additions & 1 deletion consensus/cryptonight-rs/ext/slow-hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,7 @@ void cn_slow_hash(const void *data, size_t length, char *hash, int variant, int
memcpy(state.init, text, INIT_SIZE_BYTE);
hash_permutation(&state.hs);
extra_hashes[state.hs.b[0] & 3](&state, 200, hash);
cn_slow_hash_free_state();
}

#elif !defined NO_AES && (defined(__arm__) || defined(__aarch64__))
Expand Down Expand Up @@ -1733,4 +1734,4 @@ void cn_slow_hash(const void *data, size_t length, char *hash, int variant, int
#endif
}

#endif
#endif
4 changes: 4 additions & 0 deletions miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ impl EventHandler<Self, GenerateBlockEvent> for MinerService {
}
if self.config.miner.disable_miner_client() && self.client_subscribers_num == 0 {
debug!("No miner client connected, ignore GenerateBlockEvent.");
// Once Miner client connect, we should dispatch task.
ctx.run_later(Duration::from_secs(2), |ctx| {
ctx.notify(GenerateBlockEvent::new(false));
});
return;
}
if let Err(err) = self.dispatch_task(ctx) {
Expand Down

0 comments on commit 8a741cb

Please sign in to comment.