Skip to content

Commit

Permalink
fix: obsolete wal entires while opening a migrated region (#4993)
Browse files Browse the repository at this point in the history
* fix: delete obsolete wal entrie while opening a migrated region

* chore: add logs

* chore: rust fmt

* fix: fix fuzz test
  • Loading branch information
WenyXu authored Nov 15, 2024
1 parent 08f5900 commit 4b263ef
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 96 deletions.
4 changes: 4 additions & 0 deletions src/datanode/src/heartbeat/handler/downgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl HandlerContext {

// Ignores flush request
if !writable {
warn!(
"Region: {region_id} is not writable, flush_timeout: {:?}",
flush_timeout
);
return self.downgrade_to_follower_gracefully(region_id).await;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl DowngradeLeaderRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Downgrade region reply: {:?}", reply);
let InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, Simple
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::Status;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -144,6 +145,7 @@ impl OpenCandidateRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Received open region reply: {:?}", reply);
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/worker/handle_catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Utilizes the short circuit evaluation.
let region = if !is_mutable_empty || region.manifest_ctx.has_update().await? {
let manifest_version = region.manifest_ctx.manifest_version().await;
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}");
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
Expand Down Expand Up @@ -111,6 +112,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
} else {
warn!("Skips to replay memtable for region: {}", region.region_id);
let flushed_entry_id = region.version_control.current().last_entry_id;
let on_region_opened = self.wal.on_region_opened();
on_region_opened(region_id, flushed_entry_id, &region.provider).await?;
}

if request.set_writable {
Expand Down
239 changes: 144 additions & 95 deletions tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ impl Arbitrary<'_> for FuzzInput {
let rows = rng.gen_range(128..1024);
let inserts = rng.gen_range(2..8);
Ok(FuzzInput {
partitions,
seed,
columns,
partitions,
rows,
seed,
inserts,
})
}
Expand Down Expand Up @@ -133,22 +133,15 @@ struct Migration {
region_id: RegionId,
}

async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
info!("input: {:?}", input);
let mut rng = ChaChaRng::seed_from_u64(input.seed);

let create_expr = generate_create_expr(input, &mut rng)?;
let translator = CreateTableExprTranslator;
let sql = translator.translate(&create_expr)?;
let _result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;

let table_ctx = Arc::new(TableContext::from(&create_expr));
async fn insert_values<R: Rng + 'static>(
ctx: &FuzzContext,
input: FuzzInput,
table_ctx: &TableContextRef,
rng: &mut R,
insert_exprs: &[InsertIntoExpr],
) -> Result<()> {
// Inserts data into the table
let insert_exprs = generate_insert_exprs(input, &mut rng, table_ctx.clone())?;
for insert_expr in &insert_exprs {
for insert_expr in insert_exprs {
let translator = InsertIntoExprTranslator;
let sql = translator.translate(insert_expr)?;
let result = ctx
Expand All @@ -168,63 +161,82 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<
}
);
if rng.gen_bool(0.2) {
flush_memtable(&ctx.greptime, &create_expr.table_name).await?;
flush_memtable(&ctx.greptime, &table_ctx.name).await?;
}
if rng.gen_bool(0.1) {
compact_table(&ctx.greptime, &create_expr.table_name).await?;
compact_table(&ctx.greptime, &table_ctx.name).await?;
}
}
Ok(())
}

// Fetches region distribution
let partitions = fetch_partitions(&ctx.greptime, table_ctx.name.clone()).await?;
let num_partitions = partitions.len();
let region_distribution = region_distribution(partitions);
info!("Region distribution: {region_distribution:?}");
let datanodes = fetch_nodes(&ctx.greptime)
.await?
.into_iter()
.flat_map(|node| {
if node.peer_type == PEER_TYPE_DATANODE {
Some(node)
} else {
None
}
})
.collect::<Vec<_>>();
info!("List datanodes: {:?}", datanodes);
async fn validate_insert_exprs(
ctx: &FuzzContext,
table_ctx: &TableContextRef,
insert_exprs: &[InsertIntoExpr],
) -> Result<()> {
info!("Validating rows");
let ts_column = table_ctx.timestamp_column().unwrap();
for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() {
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap();
let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone();

// Generates region migration task.
let mut migrations = Vec::with_capacity(num_partitions);
let mut new_distribution: BTreeMap<u64, HashSet<_>> = BTreeMap::new();
for (datanode_id, regions) in region_distribution {
let step = rng.gen_range(1..datanodes.len());
for region in regions {
let to_peer = (datanode_id + step as u64) % datanodes.len() as u64;
new_distribution.entry(to_peer).or_default().insert(region);
migrations.push(Migration {
from_peer: datanode_id,
to_peer,
region_id: region,
})
}
let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};",
column_list,
table_ctx.name,
ts_column.name,
ts_value,
ts_column.name,
next_batch_ts,
primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
}
let insert_expr = insert_exprs.last().unwrap();
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} ORDER BY {};",
column_list, table_ctx.name, ts_column.name, ts_value, primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;

Ok(())
}

async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<()> {
let mut procedure_ids = Vec::with_capacity(migrations.len());
// Triggers region migrations

for Migration {
from_peer,
to_peer,
region_id,
} in &migrations
} in migrations
{
let procedure_id =
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 240).await;
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
procedure_ids.push(procedure_id);
}
info!("Excepted new region distribution: {new_distribution:?}");

for (migration, procedure_id) in migrations.into_iter().zip(procedure_ids) {
for (migration, procedure_id) in migrations.iter().zip(procedure_ids) {
info!("Waits for migration: {migration:?}");
let region_id = migration.region_id.as_u64();
wait_condition_fn(
Expand All @@ -249,49 +261,86 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<
.await;
}

// Values validation
info!("Validating rows");
let ts_column = table_ctx.timestamp_column().unwrap();
for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() {
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap();
let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone();
Ok(())
}

let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};",
column_list,
create_expr.table_name,
ts_column.name,
ts_value,
ts_column.name,
next_batch_ts,
primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
info!("input: {:?}", input);
let mut rng = ChaChaRng::seed_from_u64(input.seed);

let create_expr = generate_create_expr(input, &mut rng)?;
let translator = CreateTableExprTranslator;
let sql = translator.translate(&create_expr)?;
let _result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;

let table_ctx = Arc::new(TableContext::from(&create_expr));
let mut insert_exprs = generate_insert_exprs(input, &mut rng, table_ctx.clone())?;
let remaining_insert_exprs = insert_exprs.split_off(insert_exprs.len() / 2);
insert_values(&ctx, input, &table_ctx, &mut rng, &insert_exprs).await?;

// Fetches region distribution
let partitions = fetch_partitions(&ctx.greptime, table_ctx.name.clone()).await?;
let num_partitions = partitions.len();
let region_distribution = region_distribution(partitions);
info!("Region distribution: {region_distribution:?}");
let datanodes = fetch_nodes(&ctx.greptime)
.await?
.into_iter()
.flat_map(|node| {
if node.peer_type == PEER_TYPE_DATANODE {
Some(node)
} else {
None
}
})
.collect::<Vec<_>>();
info!("List datanodes: {:?}", datanodes);

// Generates region migration task.
let mut migrations = Vec::with_capacity(num_partitions);
let mut new_distribution: BTreeMap<u64, HashSet<_>> = BTreeMap::new();
for (datanode_id, regions) in region_distribution {
let step = rng.gen_range(1..datanodes.len());
for region in regions {
let to_peer = (datanode_id + step as u64) % datanodes.len() as u64;
new_distribution.entry(to_peer).or_default().insert(region);
migrations.push(Migration {
from_peer: datanode_id,
to_peer,
region_id: region,
})
}
}
let insert_expr = insert_exprs.last().unwrap();
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} ORDER BY {};",
column_list, create_expr.table_name, ts_column.name, ts_value, primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;

info!("Excepted new region distribution: {new_distribution:?}");
// Triggers region migrations
migrate_regions(&ctx, &migrations).await?;
// Values validation
validate_insert_exprs(&ctx, &table_ctx, &insert_exprs).await?;

insert_values(&ctx, input, &table_ctx, &mut rng, &remaining_insert_exprs).await?;
// Recovers region distribution
let migrations = migrations
.into_iter()
.map(
|Migration {
from_peer,
to_peer,
region_id,
}| Migration {
from_peer: to_peer,
to_peer: from_peer,
region_id,
},
)
.collect::<Vec<_>>();
// Triggers region migrations
migrate_regions(&ctx, &migrations).await?;
// Values validation
validate_insert_exprs(&ctx, &table_ctx, &remaining_insert_exprs).await?;

// Cleans up
let sql = format!("DROP TABLE {}", table_ctx.name);
Expand Down

0 comments on commit 4b263ef

Please sign in to comment.