Skip to content

Commit

Permalink
feat: support alter partition table
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed Oct 9, 2023
1 parent ba6575b commit 8a0df6a
Show file tree
Hide file tree
Showing 14 changed files with 586 additions and 65 deletions.
44 changes: 28 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.14"
ceresdbproto = { git = "https://github.com/chunshao90/ceresdbproto.git", rev = "a95b6fe9ab988869456eea7f9dbd3154a673b0c1" }
codec = { path = "components/codec" }
notifier = { path = "components/notifier" }
chrono = "0.4"
Expand Down
20 changes: 17 additions & 3 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ impl<'a> Alterer<'a> {
);

// Validate alter schema request.
self.validate_before_alter(&request)?;
// if the alter schema request is idempotent, we can skip the alter operation.
if self.validate_before_alter(&request)? {
info!(
"Instance alter schema, idempotent, skip alter, table:{:?}",
self.table_data.name
);
return Ok(());
}

// Now we can persist and update the schema, since this function is called by
// write worker, so there is no other concurrent writer altering the
Expand Down Expand Up @@ -157,14 +164,21 @@ impl<'a> Alterer<'a> {

// Most validation should be done by catalog module, so we don't do too much
// duplicate check here, especially the schema compatibility.
fn validate_before_alter(&self, request: &AlterSchemaRequest) -> Result<()> {
// Boolean return value indicates whether the alter operation is idempotent.
// If it is idempotent, we can skip the alter operation.
// true: idempotent, false: not idempotent.
fn validate_before_alter(&self, request: &AlterSchemaRequest) -> Result<bool> {
ensure!(
!self.table_data.is_dropped(),
AlterDroppedTable {
table: &self.table_data.name,
}
);

if self.table_data.schema().columns() == request.schema.columns() {
return Ok(true);
}

let current_version = self.table_data.schema_version();
ensure!(
current_version < request.schema.version(),
Expand All @@ -184,7 +198,7 @@ impl<'a> Alterer<'a> {
}
);

Ok(())
Ok(false)
}

pub async fn alter_options_of_table(
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/sst/parquet/meta_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ impl From<ParquetMetaData> for sst_pb::ParquetMetaData {
filter: src.parquet_filter.map(|v| v.into()),
// collapsible_cols_idx is used in hybrid format ,and it's deprecated.
collapsible_cols_idx: Vec::new(),
column_values: vec![],
}
}
}
Expand Down
109 changes: 94 additions & 15 deletions partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_types::{
schema::Schema,
};
use futures::{stream::FuturesUnordered, StreamExt};
use log::error;
use generic_error::BoxError;
use snafu::ResultExt;
use table_engine::{
Expand All @@ -36,16 +37,16 @@ use table_engine::{
},
remote::{
model::{
ReadRequest as RemoteReadRequest, TableIdentifier, WriteBatchResult,
WriteRequest as RemoteWriteRequest,
AlterTableOptionsRequest, AlterTableSchemaRequest, ReadRequest as RemoteReadRequest,
TableIdentifier, WriteBatchResult, WriteRequest as RemoteWriteRequest,
},
RemoteEngineRef,
},
stream::{PartitionedStreams, SendableRecordBatchStream},
table::{
AlterSchemaRequest, CreatePartitionRule, FlushRequest, GetRequest, LocatePartitions,
ReadRequest, Result, Scan, Table, TableId, TableStats, UnexpectedWithMsg,
UnsupportedMethod, Write, WriteBatch, WriteRequest,
AlterOptions, AlterSchema, AlterSchemaRequest, CreatePartitionRule, FlushRequest,
GetRequest, LocatePartitions, ReadRequest, Result, Scan, Table, TableId, TableStats,
UnexpectedWithMsg, UnsupportedMethod, Write, WriteBatch, WriteRequest,
},
};

Expand Down Expand Up @@ -328,20 +329,98 @@ impl Table for PartitionTableImpl {
Ok(PartitionedStreams { streams })
}

async fn alter_schema(&self, _request: AlterSchemaRequest) -> Result<usize> {
UnsupportedMethod {
table: self.name(),
method: "alter_schema",
async fn alter_schema(&self, request: AlterSchemaRequest) -> Result<usize> {
let partition_num = match self.partition_info() {
None => UnexpectedWithMsg {
msg: "partition table partition info can't be empty",
}
.fail()?,
Some(partition_info) => partition_info.get_partition_num(),
};

// Alter schema of partitions except the first one.
// Because the schema of partition table is stored in the first partition.
let mut futures = FuturesUnordered::new();
for id in 1..partition_num {
let partition = self
.remote_engine
.alter_table_schema(AlterTableSchemaRequest {
table_ident: self.get_sub_table_ident(id),
table_schema: request.schema.clone(),
pre_schema_version: request.pre_schema_version,
});
futures.push(partition);
}
.fail()

let mut rets = Vec::with_capacity(futures.len());
while let Some(ret) = futures.next().await {
if ret.is_err() {
error!("alter schema failed, err:{:?}", ret);
rets.push(ret.box_err().context(AlterSchema { table: self.name() }));
}
}
if !rets.is_empty() {
rets.remove(0)?;
}

// Alter schema of the first partition.
self.remote_engine
.alter_table_schema(AlterTableSchemaRequest {
table_ident: self.get_sub_table_ident(0),
table_schema: request.schema.clone(),
pre_schema_version: request.pre_schema_version,
})
.await
.box_err()
.context(AlterSchema { table: self.name() })?;

Ok(0)
}

async fn alter_options(&self, _options: HashMap<String, String>) -> Result<usize> {
UnsupportedMethod {
table: self.name(),
method: "alter_options",
async fn alter_options(&self, options: HashMap<String, String>) -> Result<usize> {
let partition_num = match self.partition_info() {
None => UnexpectedWithMsg {
msg: "partition table partition info can't be empty",
}
.fail()?,
Some(partition_info) => partition_info.get_partition_num(),
};

// Alter options of partitions except the first one.
// Because the schema of partition table is stored in the first partition.
let mut futures = FuturesUnordered::new();
for id in 1..partition_num {
let partition = self
.remote_engine
.alter_table_options(AlterTableOptionsRequest {
table_ident: self.get_sub_table_ident(id),
options: options.clone(),
});
futures.push(partition);
}
.fail()

let mut rets = Vec::with_capacity(futures.len());
while let Some(ret) = futures.next().await {
if ret.is_err() {
error!("alter options failed, err:{:?}", ret);
rets.push(ret.box_err().context(AlterOptions { table: self.name() }));
}
}
if !rets.is_empty() {
rets.remove(0)?;
}

// Alter options of the first partition.
self.remote_engine
.alter_table_options(AlterTableOptionsRequest {
table_ident: self.get_sub_table_ident(0),
options: options.clone(),
})
.await
.box_err()
.context(AlterOptions { table: self.name() })?;

Ok(0)
}

// Partition table is a virtual table, so it don't need to flush.
Expand Down
Loading

0 comments on commit 8a0df6a

Please sign in to comment.