Skip to content

Commit

Permalink
feat: manually shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad committed Jan 30, 2019
1 parent e0143eb commit 32e4ca5
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 75 deletions.
46 changes: 30 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 28 additions & 23 deletions network/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,38 @@ use crate::ckb_protocol_handler::{CKBProtocolContext, DefaultCKBProtocolContext}
use crate::network::Network;
use crate::NetworkConfig;
use crate::{Error, ErrorKind, ProtocolId};
use ckb_util::Mutex;
use futures::future::Future;
use futures::sync::oneshot;
use log::{debug, info};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::Arc;
use std::thread;
use tokio::runtime;

pub struct NetworkService {
network: Arc<Network>,
close_tx: Option<oneshot::Sender<()>>,
join_handle: Option<thread::JoinHandle<()>>,
pub struct StopHandler {
signal: oneshot::Sender<()>,
thread: thread::JoinHandle<()>,
}

impl Drop for NetworkService {
fn drop(&mut self) {
self.shutdown().expect("shutdown CKB network service");
impl StopHandler {
pub fn new(signal: oneshot::Sender<()>, thread: thread::JoinHandle<()>) -> StopHandler {
StopHandler { signal, thread }
}

pub fn close(self) {
let StopHandler { signal, thread } = self;
if let Err(e) = signal.send(()) {
debug!(target: "network", "send shutdown signal error, ignoring error: {:?}", e)
};
thread.join().expect("join network_service thread");
}
}

pub struct NetworkService {
network: Arc<Network>,
stop_handler: Mutex<Option<StopHandler>>,
}

impl NetworkService {
#[inline]
pub fn external_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
Expand Down Expand Up @@ -82,26 +94,19 @@ impl NetworkService {
})?;
Ok(NetworkService {
network,
join_handle: Some(join_handle),
close_tx: Some(close_tx),
stop_handler: Mutex::new(Some(StopHandler::new(close_tx, join_handle))),
})
}

// Send shutdown signal to server
// This method will not wait for the server stopped, you should use server_future or
// thread_handle to achieve that.
fn shutdown(&mut self) -> Result<(), IoError> {
debug!(target: "network", "shutdown network service self: {:?}", self.external_urls(1).get(0).map(|(addr, _)|addr.to_owned()));
if let Some(close_tx) = self.close_tx.take() {
let _ = close_tx
.send(())
.map_err(|err| debug!(target: "network", "send shutdown signal error, ignoring error: {:?}", err));
};
if let Some(join_handle) = self.join_handle.take() {
join_handle.join().map_err(|_| {
IoError::new(IoErrorKind::Other, "can't join network_service thread")
})?
}
Ok(())
pub fn close(&self) {
let handler = self
.stop_handler
.lock()
.take()
.expect("network_service can only close once");
handler.close();
}
}
7 changes: 4 additions & 3 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ ckb-chain = { path = "../chain" }
ckb-miner = { path = "../miner" }
ckb-protocol = { path = "../protocol" }
ckb-pow = { path = "../pow"}
jsonrpc-core = "9.0"
jsonrpc-core = { git = "https://github.com/nervosnetwork/jsonrpc.git", branch = "2018-edition" }
jsonrpc-macros = { git = "https://github.com/nervosnetwork/jsonrpc.git", branch = "2018-edition" }
jsonrpc-http-server = "9.0"
jsonrpc-server-utils = "9.0"
jsonrpc-http-server = { git = "https://github.com/nervosnetwork/jsonrpc.git", branch = "2018-edition" }
jsonrpc-server-utils = { git = "https://github.com/nervosnetwork/jsonrpc.git", branch = "2018-edition" }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
Expand All @@ -31,6 +31,7 @@ num_cpus = "1.0"
faster-hex = "0.3"
jsonrpc-types = { path = "../util/jsonrpc-types" }
build-info = { path = "../util/build-info" }
futures = "0.1"

[dev-dependencies]
ckb-db = { path = "../db" }
Expand Down
77 changes: 59 additions & 18 deletions rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,37 @@ use ckb_pool::txs_pool::TransactionPoolController;
use ckb_pow::Clicker;
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::Shared;
use futures::sync::oneshot;
use jsonrpc_core::IoHandler;
use jsonrpc_http_server::ServerBuilder;
use jsonrpc_http_server::{Server, ServerBuilder};
use jsonrpc_server_utils::cors::AccessControlAllowOrigin;
use jsonrpc_server_utils::hosts::DomainsValidation;
use log::info;
use log::{error, info};
use std::sync::Arc;
use std::thread::{self, JoinHandle};

pub struct RpcServer {
pub config: Config,
server: Option<Server>,
thread: Option<JoinHandle<()>>,
closes: Option<Vec<oneshot::Sender<()>>>,
}

impl RpcServer {
pub fn start<CI: ChainIndex + 'static>(
&self,
pub fn new<CI: ChainIndex + 'static>(
config: Config,
network: Arc<NetworkService>,
shared: Shared<CI>,
tx_pool: TransactionPoolController,
chain: ChainController,
block_assembler: BlockAssemblerController,
test_engine: Option<Arc<Clicker>>,
) where
) -> RpcServer
where
CI: ChainIndex,
{
let mut io = IoHandler::new();

if self.config.chain_enable() {
if config.chain_enable() {
io.extend_with(
ChainRpcImpl {
shared: shared.clone(),
Expand All @@ -44,7 +49,7 @@ impl RpcServer {
);
}

if self.config.pool_enable() {
if config.pool_enable() {
io.extend_with(
PoolRpcImpl {
network: Arc::clone(&network),
Expand All @@ -54,7 +59,7 @@ impl RpcServer {
);
}

if self.config.miner_enable() {
if config.miner_enable() {
io.extend_with(
MinerRpcImpl {
shared,
Expand All @@ -66,7 +71,7 @@ impl RpcServer {
);
}

if self.config.net_enable() {
if config.net_enable() {
io.extend_with(
NetworkRpcImpl {
network: Arc::clone(&network),
Expand All @@ -75,7 +80,7 @@ impl RpcServer {
);
}

if self.config.trace_enable() {
if config.trace_enable() {
io.extend_with(
TraceRpcImpl {
network: Arc::clone(&network),
Expand All @@ -95,17 +100,53 @@ impl RpcServer {
);
}

let server = ServerBuilder::new(io)
let mut server = ServerBuilder::new(io)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Null,
AccessControlAllowOrigin::Any,
]))
.threads(self.config.threads.unwrap_or_else(num_cpus::get))
.max_request_body_size(self.config.max_request_body_size)
.start_http(&self.config.listen_address.parse().unwrap())
.unwrap();
.threads(config.threads.unwrap_or_else(num_cpus::get))
.max_request_body_size(config.max_request_body_size)
.start_http(&config.listen_address.parse().unwrap())
.expect("Jsonrpc initialize");

info!(target: "rpc", "Now listening on {:?}", server.address());
server.wait();
let closes = server.take_close();

assert!(closes.is_some());
RpcServer {
server: Some(server),
closes,
thread: None,
}
}

pub fn start(&mut self) {
let server = self.server.take().expect("Jsonrpc start only once");

let thread = thread::Builder::new()
.name("rpc".to_string())
.spawn({
move || {
info!(target: "rpc", "Now listening on {:?}", server.address());
server.wait();
}
})
.expect("Jsonrpc started");

self.thread = Some(thread);
}

pub fn close(mut self) {
if let Some(thread) = self.thread.take() {
let closes = self.closes.take().expect("jsonrpc only close once");

for close in closes {
let _ = close.send(());
}

let _ = thread.join();
} else {
error!(target: "rpc", "close failed, jsonrpc not running");
}
}
}
Loading

0 comments on commit 32e4ca5

Please sign in to comment.