Skip to content

Commit

Permalink
feat: batch alter logical tables (#3569)
Browse files Browse the repository at this point in the history
* feat: add unit test for alter logical tables

* Update src/common/meta/src/ddl/alter_table.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* feat: add some comments

* chore: add debug_assert_eq

* chore: fix some nits

* chore: remove the method batch_get_table_routes

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
fengjiachun and waynexia authored Mar 26, 2024
1 parent 7c1c6e8 commit c2dd113
Show file tree
Hide file tree
Showing 28 changed files with 1,641 additions and 245 deletions.
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};

pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_logical_tables;
pub mod create_table;
Expand Down
253 changes: 253 additions & 0 deletions src/common/meta/src/ddl/alter_logical_tables.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod check;
mod metadata;
mod region_request;
mod update_metadata;

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, Status};
use futures_util::future;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use strum::AsRefStr;
use table::metadata::TableId;

use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{Error, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders};
use crate::{cache_invalidator, metrics, ClusterId};

pub struct AlterLogicalTablesProcedure {
pub context: DdlContext,
pub data: AlterTablesData,
}

impl AlterLogicalTablesProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";

pub fn new(
cluster_id: ClusterId,
tasks: Vec<AlterTableTask>,
physical_table_id: TableId,
context: DdlContext,
) -> Self {
Self {
context,
data: AlterTablesData {
cluster_id,
state: AlterTablesState::Prepare,
tasks,
table_info_values: vec![],
physical_table_id,
physical_table_route: None,
cache_invalidate_keys: vec![],
},
}
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}

pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
// Checks all the tasks
self.check_input_tasks()?;
// Fills the table info values
self.fill_table_info_values().await?;
// Checks the physical table, must after [fill_table_info_values]
self.check_physical_table().await?;
// Fills the physical table info
self.fill_physical_table_route().await?;
// Filter the tasks
let finished_tasks = self.check_finished_tasks()?;
if finished_tasks.iter().all(|x| *x) {
return Ok(Status::done());
}
self.filter_task(&finished_tasks)?;

// Next state
self.data.state = AlterTablesState::SubmitAlterRegionRequests;
Ok(Status::executing(true))
}

pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
// Safety: we have checked the state in on_prepare
let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());

for peer in leaders {
let requester = self.context.datanode_manager.datanode(&peer).await;
let region_numbers = find_leader_regions(&physical_table_route.region_routes, &peer);

for region_number in region_numbers {
let request = self.make_request(region_number)?;
let peer = peer.clone();
let requester = requester.clone();

alter_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer))
});
}
}

future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

self.data.state = AlterTablesState::UpdateMetadata;

Ok(Status::executing(true))
}

pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
let table_info_values = self.build_update_metadata()?;
let manager = &self.context.table_metadata_manager;
let chunk_size = manager.batch_update_table_info_value_chunk_size();
if table_info_values.len() > chunk_size {
let chunks = table_info_values
.into_iter()
.chunks(chunk_size)
.into_iter()
.map(|check| check.collect::<Vec<_>>())
.collect::<Vec<_>>();
for chunk in chunks {
manager.batch_update_table_info_values(chunk).await?;
}
} else {
manager
.batch_update_table_info_values(table_info_values)
.await?;
}

self.data.state = AlterTablesState::InvalidateTableCache;
Ok(Status::executing(true))
}

pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
let to_invalidate = self
.data
.cache_invalidate_keys
.drain(..)
.map(CacheIdent::TableId)
.collect::<Vec<_>>();
self.context
.cache_invalidator
.invalidate(&cache_invalidator::Context::default(), to_invalidate)
.await?;
Ok(Status::done())
}
}

#[async_trait]
impl Procedure for AlterLogicalTablesProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
let error_handler = |e: Error| {
if e.is_retry_later() {
common_procedure::Error::retry_later(e)
} else {
common_procedure::Error::external(e)
}
};

let state = &self.data.state;

let step = state.as_ref();

let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
.with_label_values(&[step])
.start_timer();

match state {
AlterTablesState::Prepare => self.on_prepare().await,
AlterTablesState::SubmitAlterRegionRequests => {
self.on_submit_alter_region_requests().await
}
AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
}
.map_err(error_handler)
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
// CatalogLock, SchemaLock,
// TableLock
// TableNameLock(s)
let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
let table_ref = self.data.tasks[0].table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.data.physical_table_id).into());

for task in &self.data.tasks {
lock_key.push(
TableNameLock::new(
&task.alter_table.catalog_name,
&task.alter_table.schema_name,
&task.alter_table.table_name,
)
.into(),
);
}
LockKey::new(lock_key)
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct AlterTablesData {
cluster_id: ClusterId,
state: AlterTablesState,
tasks: Vec<AlterTableTask>,
/// Table info values before the alter operation.
/// Corresponding one-to-one with the AlterTableTask in tasks.
table_info_values: Vec<TableInfoValue>,
/// Physical table info
physical_table_id: TableId,
physical_table_route: Option<PhysicalTableRouteValue>,
cache_invalidate_keys: Vec<TableId>,
}

#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum AlterTablesState {
/// Prepares to alter the table
Prepare,
SubmitAlterRegionRequests,
/// Updates table metadata.
UpdateMetadata,
/// Broadcasts the invalidating table cache instruction.
InvalidateTableCache,
}
136 changes: 136 additions & 0 deletions src/common/meta/src/ddl/alter_logical_tables/check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use api::v1::alter_expr::Kind;
use snafu::{ensure, OptionExt};

use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::{AlterLogicalTablesInvalidArgumentsSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::TableRouteValue;
use crate::rpc::ddl::AlterTableTask;

impl AlterLogicalTablesProcedure {
pub(crate) fn check_input_tasks(&self) -> Result<()> {
self.check_schema()?;
self.check_alter_kind()?;
Ok(())
}

pub(crate) async fn check_physical_table(&self) -> Result<()> {
let table_route_manager = self.context.table_metadata_manager.table_route_manager();
let table_ids = self
.data
.table_info_values
.iter()
.map(|v| v.table_info.ident.table_id)
.collect::<Vec<_>>();
let table_routes = table_route_manager
.table_route_storage()
.batch_get(&table_ids)
.await?;
let physical_table_id = self.data.physical_table_id;
let is_same_physical_table = table_routes.iter().all(|r| {
if let Some(TableRouteValue::Logical(r)) = r {
r.physical_table_id() == physical_table_id
} else {
false
}
});

ensure!(
is_same_physical_table,
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "All the tasks should have the same physical table id"
}
);

Ok(())
}

pub(crate) fn check_finished_tasks(&self) -> Result<Vec<bool>> {
let task = &self.data.tasks;
let table_info_values = &self.data.table_info_values;

Ok(task
.iter()
.zip(table_info_values.iter())
.map(|(task, table)| Self::check_finished_task(task, table))
.collect())
}

// Checks if the schemas of the tasks are the same
fn check_schema(&self) -> Result<()> {
let is_same_schema = self.data.tasks.windows(2).all(|pair| {
pair[0].alter_table.catalog_name == pair[1].alter_table.catalog_name
&& pair[0].alter_table.schema_name == pair[1].alter_table.schema_name
});

ensure!(
is_same_schema,
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Schemas of the tasks are not the same"
}
);

Ok(())
}

fn check_alter_kind(&self) -> Result<()> {
for task in &self.data.tasks {
let kind = task.alter_table.kind.as_ref().context(
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Alter kind is missing",
},
)?;
let Kind::AddColumns(_) = kind else {
return AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Only support add columns operation",
}
.fail();
};
}

Ok(())
}

fn check_finished_task(task: &AlterTableTask, table: &TableInfoValue) -> bool {
let columns = table
.table_info
.meta
.schema
.column_schemas
.iter()
.map(|c| &c.name)
.collect::<HashSet<_>>();

let Some(kind) = task.alter_table.kind.as_ref() else {
return true; // Never get here since we have checked it in `check_alter_kind`
};
let Kind::AddColumns(add_columns) = kind else {
return true; // Never get here since we have checked it in `check_alter_kind`
};

// We only check that all columns have been finished. That is to say,
// if one part is finished but another part is not, it will be considered
// unfinished.
add_columns
.add_columns
.iter()
.map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
.all(|column| column.map(|c| columns.contains(c)).unwrap_or(false))
}
}
Loading

0 comments on commit c2dd113

Please sign in to comment.