Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't allow create table which failed when open #743

Merged
merged 3 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use tokio::sync::oneshot;

use crate::{
instance::{
engine::{CreateTableData, InvalidOptions, OperateByWriteWorker, Result, WriteManifest},
engine::{
CreateOpenFailedTable, CreateTableData, InvalidOptions, OperateByWriteWorker, Result,
WriteManifest,
},
write_worker::{self, CreateTableCommand, WorkerLocal},
Instance,
},
Expand All @@ -31,6 +34,13 @@ impl Instance {
) -> Result<TableDataRef> {
info!("Instance create table, request:{:?}", request);

if space.is_open_failed_table(&request.table_name) {
return CreateOpenFailedTable {
table: request.table_name,
}
.fail();
}

let mut table_opts =
table_options::merge_table_options_for_create(&request.options, &self.table_opts)
.box_err()
Expand Down
8 changes: 8 additions & 0 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ pub enum Error {
table: String,
source: GenericError,
},

#[snafu(display(
"Table open failed and can not be created again, table:{}.\nBacktrace:\n{}",
table,
backtrace,
))]
CreateOpenFailedTable { table: String, backtrace: Backtrace },
}

define_result!(Error);
Expand Down Expand Up @@ -241,6 +248,7 @@ impl From<Error> for table_engine::engine::Error {
| Error::FlushTable { .. }
| Error::StoreVersionEdit { .. }
| Error::EncodePayloads { .. }
| Error::CreateOpenFailedTable { .. }
| Error::DoManifestSnapshot { .. } => Self::Unexpected {
source: Box::new(err),
},
Expand Down
7 changes: 6 additions & 1 deletion analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,12 @@ impl Instance {
replay_batch_size,
&read_ctx,
)
.await?;
.await
.map_err(|e| {
error!("Recovery table from wal failed, table_data:{table_data:?}, err:{e}");
space.insert_open_failed_table(table_data.name.to_string());
e
})?;

space.insert_table(table_data.clone());
Ok(Some(table_data))
Expand Down
16 changes: 15 additions & 1 deletion analytic_engine/src/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ pub struct Space {
/// lock
table_datas: RwLock<TableDataSet>,

/// If table open failed, request of this table is not allowed, otherwise
/// schema may become inconsistent.
// TODO: engine should provide a repair method to fix those failed tables.
open_failed_tables: RwLock<Vec<String>>,

/// Write workers
pub write_group: WriteGroup,
/// Space memtable memory usage collector
Expand All @@ -111,8 +116,9 @@ impl Space {
Self {
id,
context,
table_datas: RwLock::new(TableDataSet::new()),
write_group,
table_datas: Default::default(),
open_failed_tables: Default::default(),
mem_usage_collector: Arc::new(MemUsageCollector::with_parent(engine_mem_collector)),
write_buffer_size,
}
Expand Down Expand Up @@ -159,6 +165,14 @@ impl Space {
assert!(success);
}

pub(crate) fn insert_open_failed_table(&self, table_name: String) {
self.open_failed_tables.write().unwrap().push(table_name)
}

pub(crate) fn is_open_failed_table(&self, table_name: &String) -> bool {
self.open_failed_tables.read().unwrap().contains(table_name)
}

/// Find table under this space by table name
pub fn find_table(&self, table_name: &str) -> Option<TableDataRef> {
self.table_datas.read().unwrap().find_table(table_name)
Expand Down
13 changes: 2 additions & 11 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ pub struct TableLocation {
pub type TableDataRef = Arc<TableData>;

/// Manages TableDataRef
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct TableDataSet {
/// Name to table data
table_datas: HashMap<String, TableDataRef>,
Expand All @@ -533,10 +533,7 @@ pub struct TableDataSet {
impl TableDataSet {
/// Create an empty TableDataSet
pub fn new() -> Self {
Self {
table_datas: HashMap::new(),
id_to_tables: HashMap::new(),
}
Self::default()
}

/// Insert if absent, if successfully inserted, return true and return
Expand Down Expand Up @@ -596,12 +593,6 @@ impl TableDataSet {
}
}

impl Default for TableDataSet {
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
pub mod tests {
use std::sync::Arc;
Expand Down