Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta/management): allow to join a cluster if a meta node has no log #8384

Merged
merged 1 commit into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/binaries/meta/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,12 @@ async fn main(_global_tracker: Arc<RuntimeTracker>) -> common_exception::Result<
}

// Join a raft cluster only after all service started.
meta_node
let join_res = meta_node
.join_cluster(&conf.raft_config, conf.grpc_api_address.clone())
.await?;

info!("join result: {:?}", join_res);

// Print information to users.
println!("Databend Metasrv");
println!();
Expand Down
45 changes: 34 additions & 11 deletions src/meta/service/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use common_meta_raft_store::state_machine::StateMachine;
use common_meta_sled_store::openraft;
use common_meta_sled_store::openraft::error::AddLearnerError;
use common_meta_sled_store::openraft::DefensiveCheck;
use common_meta_sled_store::openraft::RaftStorage;
use common_meta_sled_store::openraft::StorageError;
use common_meta_sled_store::openraft::StoreExt;
use common_meta_sled_store::SledKeySpace;
use common_meta_stoerr::MetaStorageError;
Expand Down Expand Up @@ -573,17 +575,22 @@ impl MetaNode {
&self,
conf: &RaftConfig,
grpc_api_addr: String,
) -> Result<(), MetaManagementError> {
) -> Result<Result<(), &'static str>, MetaManagementError> {
if conf.join.is_empty() {
info!("'--join' is empty, do not need joining cluster");
return Ok(());
return Ok(Err("Did not join: --join is empty"));
}

// Try to join a cluster only when this node is just created.
// Try to join a cluster only when this node has no log.
// Joining a node with log has risk messing up the data in this node and in the target cluster.
if self.is_opened() {
info!("meta node is already initialized, skip joining it to a cluster");
return Ok(());
let to_join = self
.can_join()
.await
.map_err(|e| MetaManagementError::Join(AnyError::new(&e)))?;

if !to_join {
info!("meta node has log, skip joining");
return Ok(Err("Did not join: node already has log"));
}

let mut errors = vec![];
Expand Down Expand Up @@ -632,11 +639,20 @@ impl MetaNode {
match join_res {
Ok(r) => {
let reply = r.into_inner();
if !reply.data.is_empty() {
info!("join cluster via {} success: {:?}", addr, reply.data);
return Ok(());
} else {
error!("join cluster via {} fail: {:?}", addr, reply.error);

let res: Result<ForwardResponse, MetaAPIError> = reply.into();
match res {
Ok(v) => {
info!("join cluster via {} success: {:?}", addr, v);
return Ok(Ok(()));
}
Err(e) => {
error!("join cluster via {} fail: {}", addr, e.to_string());
errors.push(
AnyError::new(&e)
.add_context(|| format!("join via: {}", addr.clone())),
);
}
}
}
Err(s) => {
Expand All @@ -655,6 +671,13 @@ impl MetaNode {
))))
}

/// Check meta-node state to see if it's appropriate to join to a cluster.
async fn can_join(&self) -> Result<bool, StorageError> {
let l = self.sto.get_log_state().await?;
info!("check can_join: log_state: {:?}", l);
Ok(l.last_log_id.is_none())
}

async fn do_start(conf: &MetaConfig) -> Result<Arc<MetaNode>, MetaStartupError> {
let raft_conf = &conf.raft_config;

Expand Down
21 changes: 8 additions & 13 deletions src/meta/service/tests/it/api/http/cluster_state_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use std::fs::File;
use std::io::Read;
use std::string::String;
Expand Down Expand Up @@ -41,22 +41,20 @@ use crate::tests::tls_constants::TEST_SERVER_CERT;
use crate::tests::tls_constants::TEST_SERVER_KEY;

#[async_entry::test(worker_threads = 3, init = "init_meta_ut!()", tracing_span = "debug")]
async fn test_cluster_nodes() -> common_exception::Result<()> {
async fn test_cluster_nodes() -> anyhow::Result<()> {
let tc0 = MetaSrvTestContext::new(0);
let mut tc1 = MetaSrvTestContext::new(1);

tc1.config.raft_config.single = false;
tc1.config.raft_config.join = vec![tc0.config.raft_config.raft_api_addr().await?.to_string()];

let meta_node = MetaNode::start(&tc0.config).await?;
meta_node
.join_cluster(&tc0.config.raft_config, tc0.config.grpc_api_address)
.await?;

let meta_node1 = MetaNode::start(&tc1.config).await?;
meta_node1
let res = meta_node1
.join_cluster(&tc1.config.raft_config, tc1.config.grpc_api_address)
.await?;
assert_eq!(Ok(()), res);

let cluster_router = Route::new()
.at("/cluster/nodes", get(nodes_handler))
Expand All @@ -81,20 +79,17 @@ async fn test_cluster_nodes() -> common_exception::Result<()> {
}

#[async_entry::test(worker_threads = 3, init = "init_meta_ut!()", tracing_span = "debug")]
async fn test_cluster_state() -> common_exception::Result<()> {
async fn test_cluster_state() -> anyhow::Result<()> {
let tc0 = MetaSrvTestContext::new(0);
let mut tc1 = MetaSrvTestContext::new(1);

tc1.config.raft_config.single = false;
tc1.config.raft_config.join = vec![tc0.config.raft_config.raft_api_addr().await?.to_string()];

let meta_node = MetaNode::start(&tc0.config).await?;
meta_node
.join_cluster(&tc0.config.raft_config, tc0.config.grpc_api_address)
.await?;

let meta_node1 = MetaNode::start(&tc1.config).await?;
meta_node1
let _ = meta_node1
.join_cluster(&tc1.config.raft_config, tc1.config.grpc_api_address)
.await?;

Expand Down Expand Up @@ -127,7 +122,7 @@ async fn test_cluster_state() -> common_exception::Result<()> {
}

#[async_entry::test(worker_threads = 3, init = "init_meta_ut!()", tracing_span = "debug")]
async fn test_http_service_cluster_state() -> common_exception::Result<()> {
async fn test_http_service_cluster_state() -> anyhow::Result<()> {
let addr_str = "127.0.0.1:30003";

let tc0 = MetaSrvTestContext::new(0);
Expand All @@ -142,7 +137,7 @@ async fn test_http_service_cluster_state() -> common_exception::Result<()> {
let _meta_node0 = MetaNode::start(&tc0.config).await?;

let meta_node1 = MetaNode::start(&tc1.config).await?;
meta_node1
let _ = meta_node1
.join_cluster(&tc1.config.raft_config, tc1.config.grpc_api_address.clone())
.await?;

Expand Down
4 changes: 0 additions & 4 deletions src/meta/service/tests/it/api/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,12 @@ use crate::tests::tls_constants::TEST_SERVER_KEY;
async fn test_http_service_tls_server() -> Result<()> {
let mut conf = Config::default();
let addr_str = "127.0.0.1:30002";
let grpc_api_addr = "127.0.0.1: 40002";

conf.admin_tls_server_key = TEST_SERVER_KEY.to_owned();
conf.admin_tls_server_cert = TEST_SERVER_CERT.to_owned();
conf.admin_api_address = addr_str.to_owned();
let tc = MetaSrvTestContext::new(0);
let meta_node = MetaNode::start(&tc.config).await?;
meta_node
.join_cluster(&tc.config.raft_config, grpc_api_addr.to_string())
.await?;

let mut srv = HttpService::create(conf, meta_node);
// test cert is issued for "localhost"
Expand Down
59 changes: 59 additions & 0 deletions src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,65 @@ async fn test_meta_node_join_rejoin() -> anyhow::Result<()> {
Ok(())
}

#[async_entry::test(worker_threads = 5, init = "init_meta_ut!()", tracing_span = "debug")]
async fn test_meta_node_join_with_log() -> anyhow::Result<()> {
// Assert that MetaNode allows joining even with initialized store.
// But does not allow joining with a store that already has raft-log.
//
// In this test it needs a cluster of 3 to form a quorum of 2, so that node-2 can be stopped.

let tc0 = MetaSrvTestContext::new(0);

let mut tc1 = MetaSrvTestContext::new(1);
tc1.config.raft_config.single = false;
tc1.config.raft_config.join = vec![tc0.config.raft_config.raft_api_addr().await?.to_string()];

let mut tc2 = MetaSrvTestContext::new(2);
tc2.config.raft_config.single = false;
tc2.config.raft_config.join = vec![tc0.config.raft_config.raft_api_addr().await?.to_string()];

let meta_node = MetaNode::start(&tc0.config).await?;
let res = meta_node
.join_cluster(&tc0.config.raft_config, tc0.config.grpc_api_address)
.await?;
assert_eq!(Err("Did not join: --join is empty"), res);

let meta_node1 = MetaNode::start(&tc1.config).await?;
let res = meta_node1
.join_cluster(&tc1.config.raft_config, tc1.config.grpc_api_address.clone())
.await?;
assert_eq!(Ok(()), res);

info!("--- initialize store for node-2");
{
let n2 = MetaNode::start(&tc2.config).await?;
n2.stop().await?;
}

info!("--- Allow to join node-2 with initialized store");
{
let n2 = MetaNode::start(&tc2.config).await?;
let res = n2
.join_cluster(&tc2.config.raft_config, tc2.config.grpc_api_address.clone())
.await?;
assert_eq!(Ok(()), res);

n2.stop().await?;
}

info!("--- Not allowed to join node-2 with store with log");
{
let n2 = MetaNode::start(&tc2.config).await?;
let res = n2
.join_cluster(&tc2.config.raft_config, tc2.config.grpc_api_address)
.await?;
assert_eq!(Err("Did not join: node already has log"), res);

n2.stop().await?;
}
Ok(())
}

#[async_entry::test(worker_threads = 5, init = "init_meta_ut!()", tracing_span = "debug")]
async fn test_meta_node_restart() -> anyhow::Result<()> {
// TODO check restarted follower.
Expand Down
4 changes: 3 additions & 1 deletion src/meta/service/tests/it/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ pub async fn start_metasrv() -> Result<(MetaSrvTestContext, String)> {

pub async fn start_metasrv_with_context(tc: &mut MetaSrvTestContext) -> Result<()> {
let mn = MetaNode::start(&tc.config).await?;
mn.join_cluster(&tc.config.raft_config, tc.config.grpc_api_address.clone())
let _ = mn
.join_cluster(&tc.config.raft_config, tc.config.grpc_api_address.clone())
.await?;

let mut srv = GrpcServer::create(tc.config.clone(), mn);
srv.start().await?;
tc.grpc_srv = Some(Box::new(srv));
Expand Down