Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: separate sessions for temporary tables with memory engine #16374

Merged
merged 14 commits into from
Sep 3, 2024
14 changes: 14 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,7 @@ impl SchemaApiTestSuite {
table_name: table_name.to_string(),
tb_id: table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;

Expand Down Expand Up @@ -1838,6 +1839,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
mt.drop_table_by_id(plan.clone()).await?;

Expand Down Expand Up @@ -1869,6 +1871,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
let res = mt.drop_table_by_id(plan).await;
let err = res.unwrap_err();
Expand All @@ -1889,6 +1892,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
mt.drop_table_by_id(plan.clone()).await?;
}
Expand Down Expand Up @@ -4154,6 +4158,7 @@ impl SchemaApiTestSuite {
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
}
Expand All @@ -4180,6 +4185,7 @@ impl SchemaApiTestSuite {
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
let table_id = resp.table_id;
Expand Down Expand Up @@ -4255,6 +4261,7 @@ impl SchemaApiTestSuite {
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
}
Expand Down Expand Up @@ -4282,6 +4289,7 @@ impl SchemaApiTestSuite {
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
let table_id = resp.table_id;
Expand Down Expand Up @@ -4457,6 +4465,7 @@ impl SchemaApiTestSuite {
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
}
Expand Down Expand Up @@ -4698,6 +4707,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name_ident.table_name.clone(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -4750,6 +4760,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -4808,6 +4819,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id: tb_info.ident.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -4910,6 +4922,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id: new_tb_info.ident.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};

let old_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -7677,6 +7690,7 @@ where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError>
db_id: self.db_id,
tb_id: self.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
self.mt.drop_table_by_id(req.clone()).await?;

Expand Down
2 changes: 2 additions & 0 deletions src/meta/api/src/share_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ impl ShareApiTestSuite {
tb_id: table_id,
db_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
let res = mt.drop_table_by_id(plan).await?;
let (share_db_id, share_specs) = res.spec_vec.unwrap();
Expand Down Expand Up @@ -2468,6 +2469,7 @@ impl ShareApiTestSuite {
tb_id: table_id,
db_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
let _res = mt.drop_table_by_id(plan).await;

Expand Down
2 changes: 2 additions & 0 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,8 @@ pub struct DropTableByIdReq {
pub db_id: MetaId,

pub engine: String,

pub session_id: String,
}

impl DropTableByIdReq {
Expand Down
1 change: 1 addition & 0 deletions src/meta/binaries/metabench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ async fn benchmark_table(client: &Arc<ClientHandle>, prefix: u64, client_num: u6
table_name: table_name(),
tb_id: t.ident.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await;

Expand Down
1 change: 1 addition & 0 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl StreamHandler for RealStreamHandler {
tb_id: table.get_id(),
db_id: db.get_db_info().database_id.db_id,
engine: engine.to_string(),
session_id: "".to_string(),
})
.await
} else if plan.if_exists {
Expand Down
6 changes: 6 additions & 0 deletions src/query/service/src/interpreters/interpreter_table_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_common_storages_view::view_table::VIEW_ENGINE;
use databend_common_users::RoleCacheManager;
use databend_common_users::UserApiProvider;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -126,6 +127,11 @@ impl Interpreter for DropTableInterpreter {
tb_id: tbl.get_table_info().ident.table_id,
db_id: db.get_db_info().database_id.db_id,
engine: tbl.engine().to_string(),
session_id: tbl
.options()
.get(OPT_KEY_TEMP_PREFIX)
.cloned()
.unwrap_or_default(),
})
.await?;

Expand Down
6 changes: 6 additions & 0 deletions src/query/service/src/interpreters/interpreter_view_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_meta_app::schema::DropTableByIdReq;
use databend_common_sql::plans::DropViewPlan;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_common_storages_view::view_table::VIEW_ENGINE;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -94,6 +95,11 @@ impl Interpreter for DropViewInterpreter {
tb_id: table.get_id(),
db_id: db.get_db_info().database_id.db_id,
engine: table.engine().to_string(),
session_id: table
.options()
.get(OPT_KEY_TEMP_PREFIX)
.cloned()
.unwrap_or_default(),
})
.await?;
};
Expand Down
1 change: 1 addition & 0 deletions src/query/service/tests/it/catalogs/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ async fn test_catalogs_table() -> Result<()> {
tb_id: tbl.get_table_info().ident.table_id,
db_id: db.get_db_info().database_id.db_id,
engine: tbl.engine().to_string(),
session_id: "".to_string(),
})
.await;
assert!(res.is_ok());
Expand Down
8 changes: 7 additions & 1 deletion src/query/storages/common/blocks/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,11 @@ use parking_lot::RwLock;
/// Indexed by table id etc.
pub type InMemoryData<K> = HashMap<K, Arc<RwLock<Vec<DataBlock>>>>;

pub static IN_MEMORY_DATA: LazyLock<Arc<RwLock<InMemoryData<u64>>>> =
pub static IN_MEMORY_DATA: LazyLock<Arc<RwLock<InMemoryData<InMemoryDataKey>>>> =
LazyLock::new(|| Arc::new(Default::default()));

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct InMemoryDataKey {
pub temp_prefix: Option<String>,
pub table_id: u64,
}
15 changes: 12 additions & 3 deletions src/query/storages/common/session/src/temp_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use databend_common_meta_app::schema::UpsertTableOptionReply;
use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_types::SeqV;
use databend_common_storage::DataOperator;
use databend_storages_common_blocks::memory::InMemoryDataKey;
use databend_storages_common_blocks::memory::IN_MEMORY_DATA;
use databend_storages_common_table_meta::meta::parse_storage_prefix;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
Expand Down Expand Up @@ -349,8 +350,12 @@ pub async fn drop_table_by_id(
return Ok(None);
}
}
let key = InMemoryDataKey {
temp_prefix: Some(req.session_id.clone()),
table_id: *tb_id,
};
let mut in_mem_data = IN_MEMORY_DATA.write();
in_mem_data.remove(tb_id).ok_or_else(|| {
in_mem_data.remove(&key).ok_or_else(|| {
ErrorCode::Internal(format!(
"Table not found in memory data {:?}, drop table request: {:?}",
in_mem_data, req
Expand All @@ -362,7 +367,7 @@ pub async fn drop_table_by_id(
Ok(Some(DropTableReply { spec_vec: None }))
}

pub async fn drop_all_temp_tables(mgr: TempTblMgrRef) -> Result<()> {
pub async fn drop_all_temp_tables(session_id: &str, mgr: TempTblMgrRef) -> Result<()> {
let (fuse_dirs, mem_tbl_ids) = {
let mut guard = mgr.lock();
let mut fuse_dirs = Vec::new();
Expand All @@ -389,7 +394,11 @@ pub async fn drop_all_temp_tables(mgr: TempTblMgrRef) -> Result<()> {
if !mem_tbl_ids.is_empty() {
let mut in_mem_data = IN_MEMORY_DATA.write();
for id in mem_tbl_ids {
in_mem_data.remove(&id);
let key = InMemoryDataKey {
temp_prefix: Some(session_id.to_string()),
table_id: id,
};
in_mem_data.remove(&key);
}
}
Ok(())
Expand Down
13 changes: 10 additions & 3 deletions src/query/storages/memory/src/memory_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ use databend_common_pipeline_sinks::Sinker;
use databend_common_pipeline_sources::SyncSource;
use databend_common_pipeline_sources::SyncSourcer;
use databend_common_storage::StorageMetrics;
use databend_storages_common_blocks::memory::InMemoryDataKey;
use databend_storages_common_blocks::memory::IN_MEMORY_DATA;
use databend_storages_common_table_meta::meta::SnapshotId;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
use parking_lot::Mutex;
use parking_lot::RwLock;

Expand All @@ -65,13 +67,18 @@ pub struct MemoryTable {

impl MemoryTable {
pub fn try_create(table_info: TableInfo) -> Result<Box<dyn Table>> {
let table_id = &table_info.ident.table_id;
let table_id = table_info.ident.table_id;
let temp_prefix = table_info.options().get(OPT_KEY_TEMP_PREFIX).cloned();
let blocks = {
let mut in_mem_data = IN_MEMORY_DATA.write();
let x = in_mem_data.get(table_id);
let key = InMemoryDataKey {
temp_prefix,
table_id,
};
let x = in_mem_data.get(&key);
x.cloned().unwrap_or_else(|| {
let blocks = Arc::new(RwLock::new(vec![]));
in_mem_data.insert(*table_id, blocks.clone());
in_mem_data.insert(key, blocks.clone());
blocks
})
};
Expand Down
Loading