Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): support mutation during insertion #8205

Merged
merged 15 commits into from
Oct 19, 2022
7 changes: 5 additions & 2 deletions src/query/catalog/src/table_mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::Result;
use common_meta_app::schema::TableInfo;

use crate::table::Table;

#[async_trait::async_trait]
pub trait TableMutator: Send + Sync {
async fn blocks_select(&mut self) -> Result<bool>;
async fn try_commit(&self, table_info: &TableInfo) -> Result<()>;
async fn try_commit(&self, table: Arc<dyn Table>) -> Result<()>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Interpreter for OptimizeTableInterpreter {
executor.execute()?;
drop(executor);

mutator.try_commit(table.get_table_info()).await?;
mutator.try_commit(table.clone()).await?;
}

if do_purge {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl Interpreter for ReclusterTableInterpreter {
executor.execute()?;
drop(executor);

mutator.try_commit(table.get_table_info()).await?;
mutator.try_commit(table).await?;

if !plan.is_final {
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_base::base::tokio;
use common_catalog::table::Table;
use common_catalog::table_mutator::TableMutator;
use common_exception::ErrorCode;
use common_exception::Result;
use common_storages_fuse::FuseTable;
use databend_query::pipelines::executor::ExecutorSettings;
use databend_query::pipelines::executor::PipelineCompleteExecutor;
use databend_query::sessions::QueryContext;
use databend_query::sessions::TableContext;
use databend_query::sql::plans::Plan;
use databend_query::sql::Planner;

use crate::storages::fuse::table_test_fixture::execute_command;
use crate::storages::fuse::table_test_fixture::execute_query;
use crate::storages::fuse::table_test_fixture::expects_ok;
use crate::storages::fuse::table_test_fixture::TestFixture;

#[tokio::test]
async fn test_compact() -> Result<()> {
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();
let tbl_name = fixture.default_table_name();
let db_name = fixture.default_db_name();

fixture.create_normal_table().await?;

// insert
for i in 0..9 {
let qry = format!("insert into {}.{}(id) values({})", db_name, tbl_name, i);
execute_command(ctx.clone(), qry.as_str()).await?;
}

// compact
let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?;
let table = catalog
.get_table(ctx.get_tenant().as_str(), &db_name, &tbl_name)
.await?;
let mutator = build_mutator(ctx.clone(), table.clone()).await?;

// compact commit
mutator.try_commit(table).await?;

// check count
let expected = vec![
"+---------------+-------+",
"| segment_count | count |",
"+---------------+-------+",
"| 1 | 1 |",
"+---------------+-------+",
];
let qry = format!(
"select segment_count, block_count as count from fuse_snapshot('{}', '{}') limit 1",
db_name, tbl_name
);
expects_ok(
"check segment and block count",
execute_query(fixture.ctx(), qry.as_str()).await,
expected,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_compact_resolved_conflict() -> Result<()> {
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();
let tbl_name = fixture.default_table_name();
let db_name = fixture.default_db_name();

fixture.create_normal_table().await?;

// insert
for i in 0..9 {
let qry = format!("insert into {}.{}(id) values({})", db_name, tbl_name, i);
execute_command(ctx.clone(), qry.as_str()).await?;
}

// compact
let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?;
let table = catalog
.get_table(ctx.get_tenant().as_str(), &db_name, &tbl_name)
.await?;
let mutator = build_mutator(ctx.clone(), table.clone()).await?;

// insert
let qry = format!("insert into {}.{}(id) values(10)", db_name, tbl_name);
execute_command(ctx.clone(), qry.as_str()).await?;

// compact commit
mutator.try_commit(table).await?;

// check count
let expected = vec![
"+---------------+-------+",
"| segment_count | count |",
"+---------------+-------+",
"| 2 | 2 |",
"+---------------+-------+",
];
let qry = format!(
"select segment_count, block_count as count from fuse_snapshot('{}', '{}') limit 1",
db_name, tbl_name
);
expects_ok(
"check segment and block count",
execute_query(fixture.ctx(), qry.as_str()).await,
expected,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_compact_unresolved_conflict() -> Result<()> {
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();
let tbl_name = fixture.default_table_name();
let db_name = fixture.default_db_name();

fixture.create_normal_table().await?;

// insert
for i in 0..9 {
let qry = format!("insert into {}.{}(id) values({})", db_name, tbl_name, i);
execute_command(ctx.clone(), qry.as_str()).await?;
}

// compact
let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?;
let table = catalog
.get_table(ctx.get_tenant().as_str(), &db_name, &tbl_name)
.await?;
let mutator = build_mutator(ctx.clone(), table.clone()).await?;

// delete
let query = format!("delete from {}.{} where id=1", db_name, tbl_name);
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(&query).await?;
if let Plan::Delete(delete) = plan {
table.delete(ctx.clone(), *delete.clone()).await?;
}

// compact commit
let r = mutator.try_commit(table).await;
assert!(r.is_err());
assert_eq!(r.err().unwrap().code(), ErrorCode::storage_other_code());

Ok(())
}

async fn build_mutator(
ctx: Arc<QueryContext>,
table: Arc<dyn Table>,
) -> Result<Arc<dyn TableMutator>> {
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let settings = ctx.get_settings();
settings.set_max_threads(1)?;
let mut pipeline = common_pipeline_core::Pipeline::create();
let mutator = fuse_table.compact(ctx.clone(), &mut pipeline).await?;
assert!(mutator.is_some());
let mutator = mutator.unwrap();
pipeline.set_max_threads(1);
let executor_settings = ExecutorSettings::try_create(&settings)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
ctx.set_executor(Arc::downgrade(&executor.get_inner()));
executor.execute()?;
drop(executor);
Ok(mutator)
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> {
}
}

let new_snapshot = mutator.into_new_snapshot().await?;
let (segments, _, _) = mutator.generate_segments().await?;

// half segments left after deletion
assert_eq!(new_snapshot.segments.len(), 50);
assert_eq!(segments.len(), 50);

// new_segments should be a subset of test_segments in our case (no partial deletion of segment)
let new_segments = HashSet::<_, RandomState>::from_iter(new_snapshot.segments.into_iter());
let new_segments = HashSet::<_, RandomState>::from_iter(segments.into_iter());
let test_segments = HashSet::from_iter(test_segment_locations.into_iter());
assert!(new_segments.is_subset(&test_segments));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod compact_mutator;
mod deletion_mutator;
mod recluster_mutator;
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use common_fuse_meta::meta::ColumnStatistics;
use common_legacy_planners::Extras;
use common_legacy_planners::Projection;
use common_storages_fuse::ColumnLeaves;
use databend_query::interpreters::CreateTableInterpreterV2;
use databend_query::interpreters::Interpreter;
use databend_query::storages::fuse::ColumnLeaf;
use databend_query::storages::fuse::FuseTable;
use futures::TryStreamExt;
Expand Down Expand Up @@ -146,9 +144,7 @@ async fn test_fuse_table_exact_statistic() -> Result<()> {
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();

let create_table_plan = fixture.default_crate_table_plan();
let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?;
interpreter.execute(ctx.clone()).await?;
fixture.create_default_table().await?;

let mut table = fixture.latest_default_table().await?;

Expand Down
20 changes: 6 additions & 14 deletions src/query/service/tests/it/storages/fuse/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ async fn test_fuse_table_normal_case() -> Result<()> {
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();

let create_table_plan = fixture.default_crate_table_plan();
let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?;
interpreter.execute(ctx.clone()).await?;
fixture.create_default_table().await?;

let mut table = fixture.latest_default_table().await?;

Expand Down Expand Up @@ -172,9 +170,7 @@ async fn test_fuse_table_truncate() -> Result<()> {
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();

let create_table_plan = fixture.default_crate_table_plan();
let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?;
interpreter.execute(ctx.clone()).await?;
fixture.create_default_table().await?;

let table = fixture.latest_default_table().await?;

Expand Down Expand Up @@ -235,15 +231,10 @@ async fn test_fuse_table_truncate() -> Result<()> {
async fn test_fuse_table_optimize() -> Result<()> {
let fixture = TestFixture::new().await;
let ctx = fixture.ctx();
let mut planner = Planner::new(ctx.clone());
let tbl_name = fixture.default_table_name();
let db_name = fixture.default_db_name();

let create_table_plan = fixture.create_normal_table_plan();

// create test table
let tbl_name = create_table_plan.table.clone();
let db_name = create_table_plan.database.clone();
let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?;
interpreter.execute(ctx.clone()).await?;
fixture.create_normal_table().await?;

// insert 5 times
let n = 5;
Expand All @@ -266,6 +257,7 @@ async fn test_fuse_table_optimize() -> Result<()> {
// do compact
let query = format!("optimize table {}.{} compact", db_name, tbl_name);

let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(&query).await?;
let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use common_datavalues::prelude::*;
use common_exception::ErrorCode;
use common_exception::Result;
use common_legacy_expression::*;
use databend_query::interpreters::CreateTableInterpreterV2;
use databend_query::interpreters::Interpreter;
use tokio_stream::StreamExt;

use crate::storages::fuse::table_test_fixture::*;
Expand All @@ -32,9 +30,7 @@ async fn test_clustering_information_table_read() -> Result<()> {
let ctx = fixture.ctx();

// test db & table
let create_table_plan = fixture.default_crate_table_plan();
let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?;
interpreter.execute(ctx.clone()).await?;
fixture.create_default_table().await?;

// func args
let arg_db = LegacyExpression::create_literal(DataValue::String(db.as_bytes().to_vec()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use common_base::base::tokio;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use databend_query::interpreters::CreateTableInterpreterV2;
use databend_query::interpreters::Interpreter;
use tokio_stream::StreamExt;

use crate::storages::fuse::table_test_fixture::*;
Expand All @@ -30,9 +28,7 @@ async fn test_fuse_block_table() -> Result<()> {
let ctx = fixture.ctx();

// test db & table
let create_table_plan = fixture.default_crate_table_plan();
let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?;
interpreter.execute(ctx.clone()).await?;
fixture.create_default_table().await?;

{
let expected = vec![
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use common_datavalues::prelude::*;
use common_exception::ErrorCode;
use common_exception::Result;
use common_legacy_expression::*;
use databend_query::interpreters::CreateTableInterpreterV2;
use databend_query::interpreters::Interpreter;
use tokio_stream::StreamExt;

use crate::storages::fuse::table_test_fixture::*;
Expand Down Expand Up @@ -82,9 +80,7 @@ async fn test_fuse_snapshot_table_read() -> Result<()> {
let ctx = fixture.ctx();

// test db & table
let create_table_plan = fixture.default_crate_table_plan();
let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?;
interpreter.execute(ctx.clone()).await?;
fixture.create_default_table().await?;

{
let expected = vec![
Expand Down
10 changes: 9 additions & 1 deletion src/query/service/tests/it/storages/fuse/table_test_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl TestFixture {
}

// create a normal table without cluster key.
pub fn create_normal_table_plan(&self) -> CreateTablePlanV2 {
pub fn normal_create_table_plan(&self) -> CreateTablePlanV2 {
CreateTablePlanV2 {
if_not_exists: false,
tenant: self.default_tenant(),
Expand Down Expand Up @@ -194,6 +194,14 @@ impl TestFixture {
Ok(())
}

pub async fn create_normal_table(&self) -> Result<()> {
let create_table_plan = self.normal_create_table_plan();
let interpreter =
CreateTableInterpreterV2::try_create(self.ctx.clone(), create_table_plan)?;
interpreter.execute(self.ctx.clone()).await?;
Ok(())
}

pub fn gen_sample_blocks(num: usize, start: i32) -> Vec<Result<DataBlock>> {
Self::gen_sample_blocks_ex(num, 3, start)
}
Expand Down
Loading