Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: obsolete wal entires while opening a migrated region #4993

Merged
merged 4 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/datanode/src/heartbeat/handler/downgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl HandlerContext {

// Ignores flush request
if !writable {
warn!(
"Region: {region_id} is not writable, flush_timeout: {:?}",
flush_timeout
);
return self.downgrade_to_follower_gracefully(region_id).await;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl DowngradeLeaderRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Downgrade region reply: {:?}", reply);
let InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, Simple
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::Status;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -144,6 +145,7 @@ impl OpenCandidateRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Received open region reply: {:?}", reply);
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/worker/handle_catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Utilizes the short circuit evaluation.
let region = if !is_mutable_empty || region.manifest_ctx.has_update().await? {
let manifest_version = region.manifest_ctx.manifest_version().await;
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}");
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
Expand Down Expand Up @@ -111,6 +112,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
} else {
warn!("Skips to replay memtable for region: {}", region.region_id);
let flushed_entry_id = region.version_control.current().last_entry_id;
let on_region_opened = self.wal.on_region_opened();
on_region_opened(region_id, flushed_entry_id, &region.provider).await?;
}

if request.set_writable {
Expand Down
239 changes: 144 additions & 95 deletions tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ impl Arbitrary<'_> for FuzzInput {
let rows = rng.gen_range(128..1024);
let inserts = rng.gen_range(2..8);
Ok(FuzzInput {
partitions,
seed,
columns,
partitions,
rows,
seed,
inserts,
})
}
Expand Down Expand Up @@ -133,22 +133,15 @@ struct Migration {
region_id: RegionId,
}

async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
info!("input: {:?}", input);
let mut rng = ChaChaRng::seed_from_u64(input.seed);

let create_expr = generate_create_expr(input, &mut rng)?;
let translator = CreateTableExprTranslator;
let sql = translator.translate(&create_expr)?;
let _result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;

let table_ctx = Arc::new(TableContext::from(&create_expr));
async fn insert_values<R: Rng + 'static>(
ctx: &FuzzContext,
input: FuzzInput,
table_ctx: &TableContextRef,
rng: &mut R,
insert_exprs: &[InsertIntoExpr],
) -> Result<()> {
// Inserts data into the table
let insert_exprs = generate_insert_exprs(input, &mut rng, table_ctx.clone())?;
for insert_expr in &insert_exprs {
for insert_expr in insert_exprs {
let translator = InsertIntoExprTranslator;
let sql = translator.translate(insert_expr)?;
let result = ctx
Expand All @@ -168,63 +161,82 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<
}
);
if rng.gen_bool(0.2) {
flush_memtable(&ctx.greptime, &create_expr.table_name).await?;
flush_memtable(&ctx.greptime, &table_ctx.name).await?;
}
if rng.gen_bool(0.1) {
compact_table(&ctx.greptime, &create_expr.table_name).await?;
compact_table(&ctx.greptime, &table_ctx.name).await?;
}
}
Ok(())
}

// Fetches region distribution
let partitions = fetch_partitions(&ctx.greptime, table_ctx.name.clone()).await?;
let num_partitions = partitions.len();
let region_distribution = region_distribution(partitions);
info!("Region distribution: {region_distribution:?}");
let datanodes = fetch_nodes(&ctx.greptime)
.await?
.into_iter()
.flat_map(|node| {
if node.peer_type == PEER_TYPE_DATANODE {
Some(node)
} else {
None
}
})
.collect::<Vec<_>>();
info!("List datanodes: {:?}", datanodes);
async fn validate_insert_exprs(
ctx: &FuzzContext,
table_ctx: &TableContextRef,
insert_exprs: &[InsertIntoExpr],
) -> Result<()> {
info!("Validating rows");
let ts_column = table_ctx.timestamp_column().unwrap();
for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() {
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap();
let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone();

// Generates region migration task.
let mut migrations = Vec::with_capacity(num_partitions);
let mut new_distribution: BTreeMap<u64, HashSet<_>> = BTreeMap::new();
for (datanode_id, regions) in region_distribution {
let step = rng.gen_range(1..datanodes.len());
for region in regions {
let to_peer = (datanode_id + step as u64) % datanodes.len() as u64;
new_distribution.entry(to_peer).or_default().insert(region);
migrations.push(Migration {
from_peer: datanode_id,
to_peer,
region_id: region,
})
}
let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};",
column_list,
table_ctx.name,
ts_column.name,
ts_value,
ts_column.name,
next_batch_ts,
primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
}
let insert_expr = insert_exprs.last().unwrap();
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} ORDER BY {};",
column_list, table_ctx.name, ts_column.name, ts_value, primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;

Ok(())
}

async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<()> {
let mut procedure_ids = Vec::with_capacity(migrations.len());
// Triggers region migrations

for Migration {
from_peer,
to_peer,
region_id,
} in &migrations
} in migrations
{
let procedure_id =
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 240).await;
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
procedure_ids.push(procedure_id);
}
info!("Excepted new region distribution: {new_distribution:?}");

for (migration, procedure_id) in migrations.into_iter().zip(procedure_ids) {
for (migration, procedure_id) in migrations.iter().zip(procedure_ids) {
info!("Waits for migration: {migration:?}");
let region_id = migration.region_id.as_u64();
wait_condition_fn(
Expand All @@ -249,49 +261,86 @@ async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<
.await;
}

// Values validation
info!("Validating rows");
let ts_column = table_ctx.timestamp_column().unwrap();
for (idx, insert_expr) in insert_exprs[0..insert_exprs.len() - 1].iter().enumerate() {
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let next_batch_ts_column_idx = insert_exprs[idx + 1].timestamp_column_idx().unwrap();
let next_batch_ts = insert_exprs[idx + 1].values_list[0][next_batch_ts_column_idx].clone();
Ok(())
}

let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} AND {} < {} ORDER BY {};",
column_list,
create_expr.table_name,
ts_column.name,
ts_value,
ts_column.name,
next_batch_ts,
primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;
async fn execute_region_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
info!("input: {:?}", input);
let mut rng = ChaChaRng::seed_from_u64(input.seed);

let create_expr = generate_create_expr(input, &mut rng)?;
let translator = CreateTableExprTranslator;
let sql = translator.translate(&create_expr)?;
let _result = sqlx::query(&sql)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;

let table_ctx = Arc::new(TableContext::from(&create_expr));
let mut insert_exprs = generate_insert_exprs(input, &mut rng, table_ctx.clone())?;
let remaining_insert_exprs = insert_exprs.split_off(insert_exprs.len() / 2);
insert_values(&ctx, input, &table_ctx, &mut rng, &insert_exprs).await?;

// Fetches region distribution
let partitions = fetch_partitions(&ctx.greptime, table_ctx.name.clone()).await?;
let num_partitions = partitions.len();
let region_distribution = region_distribution(partitions);
info!("Region distribution: {region_distribution:?}");
let datanodes = fetch_nodes(&ctx.greptime)
.await?
.into_iter()
.flat_map(|node| {
if node.peer_type == PEER_TYPE_DATANODE {
Some(node)
} else {
None
}
})
.collect::<Vec<_>>();
info!("List datanodes: {:?}", datanodes);

// Generates region migration task.
let mut migrations = Vec::with_capacity(num_partitions);
let mut new_distribution: BTreeMap<u64, HashSet<_>> = BTreeMap::new();
for (datanode_id, regions) in region_distribution {
let step = rng.gen_range(1..datanodes.len());
for region in regions {
let to_peer = (datanode_id + step as u64) % datanodes.len() as u64;
new_distribution.entry(to_peer).or_default().insert(region);
migrations.push(Migration {
from_peer: datanode_id,
to_peer,
region_id: region,
})
}
}
let insert_expr = insert_exprs.last().unwrap();
let ts_column_idx = insert_expr.timestamp_column_idx().unwrap();
let ts_value = insert_expr.values_list[0][ts_column_idx].clone();
let primary_keys_idx = insert_expr.primary_key_column_idx();
let column_list = format_columns(&insert_expr.columns);
let primary_keys_column_list = format_columns(&insert_expr.primary_key_columns());
let select_sql = format!(
"SELECT {} FROM {} WHERE {} >= {} ORDER BY {};",
column_list, create_expr.table_name, ts_column.name, ts_value, primary_keys_column_list
);
info!("Executing sql: {select_sql}");
let fetched_rows = ctx.greptime.fetch_all(select_sql.as_str()).await.unwrap();
let mut expected_rows = replace_default(&insert_expr.values_list, &table_ctx, insert_expr);
sort_by_primary_keys(&mut expected_rows, primary_keys_idx);
validator::row::assert_eq::<MySql>(&insert_expr.columns, &fetched_rows, &expected_rows)?;

info!("Excepted new region distribution: {new_distribution:?}");
// Triggers region migrations
migrate_regions(&ctx, &migrations).await?;
// Values validation
validate_insert_exprs(&ctx, &table_ctx, &insert_exprs).await?;

insert_values(&ctx, input, &table_ctx, &mut rng, &remaining_insert_exprs).await?;
// Recovers region distribution
let migrations = migrations
.into_iter()
.map(
|Migration {
from_peer,
to_peer,
region_id,
}| Migration {
from_peer: to_peer,
to_peer: from_peer,
region_id,
},
)
.collect::<Vec<_>>();
// Triggers region migrations
migrate_regions(&ctx, &migrations).await?;
// Values validation
validate_insert_exprs(&ctx, &table_ctx, &remaining_insert_exprs).await?;

// Cleans up
let sql = format!("DROP TABLE {}", table_ctx.name);
Expand Down