diff --git a/common/exception/src/exception_code.rs b/common/exception/src/exception_code.rs index 1d226e6f2862..045474a0132a 100644 --- a/common/exception/src/exception_code.rs +++ b/common/exception/src/exception_code.rs @@ -161,6 +161,8 @@ build_exceptions! { TableVersionMismatched(2009), OCCRetryFailure(2011), + TableNotWritable(2012), + TableHistoricalDataNotFound(2013), // User api error codes. UnknownUser(2201), diff --git a/query/src/storages/fuse/fuse_table.rs b/query/src/storages/fuse/fuse_table.rs index 35953bf40fe8..b54f06fc8068 100644 --- a/query/src/storages/fuse/fuse_table.rs +++ b/query/src/storages/fuse/fuse_table.rs @@ -53,10 +53,16 @@ pub struct FuseTable { pub(crate) meta_location_generator: TableMetaLocationGenerator, pub(crate) order_keys: Vec, + pub(crate) read_only: bool, } impl FuseTable { pub fn try_create(_ctx: StorageContext, table_info: TableInfo) -> Result> { + let r = Self::do_create(table_info, false)?; + Ok(r) + } + + pub fn do_create(table_info: TableInfo, read_only: bool) -> Result> { let storage_prefix = Self::parse_storage_prefix(&table_info)?; let mut order_keys = Vec::new(); if let Some(order) = &table_info.meta.order_keys { @@ -67,6 +73,7 @@ impl FuseTable { table_info, order_keys, meta_location_generator: TableMetaLocationGenerator::with_prefix(storage_prefix), + read_only, })) } @@ -116,7 +123,7 @@ impl FuseTable { pub fn snapshot_format_version(&self) -> u64 { match self.snapshot_loc() { - Some(loc) => TableMetaLocationGenerator::snaphost_version(loc.as_str()), + Some(loc) => TableMetaLocationGenerator::snapshot_version(loc.as_str()), None => { // No snapshot location here, indicates that there are no data of this table yet // in this case, we just returns the current snapshot version @@ -143,6 +150,17 @@ impl FuseTable { )) }) } + + pub fn check_mutable(&self) -> Result<()> { + if self.read_only { + Err(ErrorCode::TableNotWritable(format!( + "Table {} is in read-only mode", + self.table_info.desc.as_str() + ))) + } else { + Ok(()) + } + } } #[async_trait::async_trait] @@ -182,7 +200,7 @@ impl Table for FuseTable { ctx: Arc, plan: &ReadDataSourcePlan, ) -> Result { - self.do_read(ctx, &plan.push_downs).await + self.do_read(ctx, &plan.push_downs) } #[tracing::instrument(level = "debug", name = "fuse_table_read2", skip(self, ctx, pipeline), fields(ctx.id = ctx.get_id().as_str()))] @@ -196,6 +214,7 @@ impl Table for FuseTable { } fn append2(&self, ctx: Arc, pipeline: &mut NewPipeline) -> Result<()> { + self.check_mutable()?; self.do_append2(ctx, pipeline) } @@ -205,6 +224,7 @@ impl Table for FuseTable { ctx: Arc, stream: SendableDataBlockStream, ) -> Result { + self.check_mutable()?; let log_entry_stream = self.append_chunks(ctx, stream).await?; let data_block_stream = log_entry_stream.map(|append_log_entry_res| match append_log_entry_res { @@ -221,6 +241,7 @@ impl Table for FuseTable { operations: Vec, overwrite: bool, ) -> Result<()> { + self.check_mutable()?; // only append operation supported currently let append_log_entries = operations .iter() @@ -235,10 +256,12 @@ impl Table for FuseTable { ctx: Arc, truncate_plan: TruncateTablePlan, ) -> Result<()> { + self.check_mutable()?; self.do_truncate(ctx, truncate_plan).await } async fn optimize(&self, ctx: Arc, keep_last_snapshot: bool) -> Result<()> { + self.check_mutable()?; self.do_optimize(ctx, keep_last_snapshot).await } diff --git a/query/src/storages/fuse/io/locations.rs b/query/src/storages/fuse/io/locations.rs index 3936f8692095..5a577a32313a 100644 --- a/query/src/storages/fuse/io/locations.rs +++ b/query/src/storages/fuse/io/locations.rs @@ -66,11 +66,11 @@ impl TableMetaLocationGenerator { } pub fn snapshot_location_from_uuid(&self, id: &Uuid, version: u64) -> Result { - let snaphost_version = SnapshotVersion::try_from(version)?; - Ok(snaphost_version.create(id, &self.prefix)) + let snapshot_version = SnapshotVersion::try_from(version)?; + Ok(snapshot_version.create(id, &self.prefix)) } - pub fn snaphost_version(location: impl AsRef) -> u64 { + pub fn snapshot_version(location: impl AsRef) -> u64 { if location.as_ref().ends_with(SNAPHOST_V1.suffix()) { SNAPHOST_V1.version() } else { diff --git a/query/src/storages/fuse/io/read/meta_readers.rs b/query/src/storages/fuse/io/read/meta_readers.rs index d36df5facdd8..71b7ced109f8 100644 --- a/query/src/storages/fuse/io/read/meta_readers.rs +++ b/query/src/storages/fuse/io/read/meta_readers.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::pin::Pin; use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; use futures::io::BufReader; +use futures::stream; use opendal::BytesReader; use super::cached_reader::CachedReader; @@ -98,6 +100,47 @@ impl<'a> TableSnapshotReader<'a> { Ok(snapshots) } + + pub fn snapshot_history( + &'a self, + location: String, + format_version: u64, + location_gen: TableMetaLocationGenerator, + ) -> Pin>> + 'a>> { + let stream = stream::try_unfold( + (self, location_gen, Some((location, format_version))), + |(reader, gen, next)| async move { + if let Some((loc, ver)) = next { + let snapshot = match reader.read(loc, None, ver).await { + Ok(s) => Ok(Some(s)), + Err(e) => { + if e.code() == ErrorCode::storage_not_found_code() { + Ok(None) + } else { + Err(e) + } + } + }; + match snapshot { + Ok(Some(snapshot)) => { + if let Some((id, v)) = snapshot.prev_snapshot_id { + let new_ver = v; + let new_loc = gen.snapshot_location_from_uuid(&id, v)?; + Ok(Some((snapshot, (reader, gen, Some((new_loc, new_ver)))))) + } else { + Ok(Some((snapshot, (reader, gen, None)))) + } + } + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } else { + Ok(None) + } + }, + ); + Box::pin(stream) + } } #[async_trait::async_trait] diff --git a/query/src/storages/fuse/meta/v1/snapshot.rs b/query/src/storages/fuse/meta/v1/snapshot.rs index 487e6d4d5527..b65669242d0d 100644 --- a/query/src/storages/fuse/meta/v1/snapshot.rs +++ b/query/src/storages/fuse/meta/v1/snapshot.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Add; + +use chrono::DateTime; +use chrono::Utc; use common_datavalues::DataSchema; use serde::Deserialize; use serde::Serialize; @@ -30,6 +34,10 @@ pub struct TableSnapshot { /// id of snapshot pub snapshot_id: SnapshotId, + /// previous snapshot + pub timestamp: Option>, + + /// previous snapshot pub prev_snapshot_id: Option<(SnapshotId, FormatVersion)>, /// For each snapshot, we keep a schema for it (in case of schema evolution) @@ -48,14 +56,26 @@ pub struct TableSnapshot { impl TableSnapshot { pub fn new( snapshot_id: SnapshotId, + prev_timestamp: &Option>, prev_snapshot_id: Option<(SnapshotId, FormatVersion)>, schema: DataSchema, summary: Statistics, segments: Vec, ) -> Self { + // timestamp of the snapshot should always larger than the previous one's + let now = Utc::now(); + let mut timestamp = Some(now); + if let Some(prev_instant) = prev_timestamp { + if prev_instant > &now { + // if local time is smaller, use the timestamp of previous snapshot, plus 1 ms + timestamp = Some(prev_instant.add(chrono::Duration::milliseconds(1))) + } + }; + Self { format_version: TableSnapshot::VERSION, snapshot_id, + timestamp, prev_snapshot_id, schema, summary, @@ -75,6 +95,7 @@ impl From for TableSnapshot { Self { format_version: TableSnapshot::VERSION, snapshot_id: s.snapshot_id, + timestamp: None, prev_snapshot_id: s.prev_snapshot_id.map(|id| (id, 0)), schema: s.schema, summary: s.summary, diff --git a/query/src/storages/fuse/operations/commit.rs b/query/src/storages/fuse/operations/commit.rs index d06579d628b2..065b54891aa1 100644 --- a/query/src/storages/fuse/operations/commit.rs +++ b/query/src/storages/fuse/operations/commit.rs @@ -146,6 +146,7 @@ impl FuseTable { ) -> Result<()> { let prev = self.read_table_snapshot(ctx).await?; let prev_version = self.snapshot_format_version(); + let prev_timestamp = prev.as_ref().and_then(|v| v.timestamp); let schema = self.table_info.meta.schema.as_ref().clone(); let (segments, summary) = Self::merge_append_operations(operation_log)?; @@ -162,6 +163,7 @@ impl FuseTable { let new_snapshot = if overwrite { TableSnapshot::new( Uuid::new_v4(), + &prev_timestamp, prev.as_ref().map(|v| (v.snapshot_id, prev_version)), schema, summary, @@ -228,6 +230,7 @@ impl FuseTable { statistics }; let prev_snapshot_id = previous.as_ref().map(|v| (v.snapshot_id, prev_version)); + let prev_snapshot_timestamp = previous.as_ref().and_then(|v| v.timestamp); // 2. merge segment locations with previous snapshot, if any if let Some(snapshot) = &previous { @@ -237,6 +240,7 @@ impl FuseTable { let new_snapshot = TableSnapshot::new( Uuid::new_v4(), + &prev_snapshot_timestamp, prev_snapshot_id, schema.clone(), stats, diff --git a/query/src/storages/fuse/operations/mod.rs b/query/src/storages/fuse/operations/mod.rs index 583c3e71156d..475046f64634 100644 --- a/query/src/storages/fuse/operations/mod.rs +++ b/query/src/storages/fuse/operations/mod.rs @@ -15,6 +15,7 @@ mod append; mod commit; mod fuse_sink; +mod navigate; mod operation_log; mod optimize; mod read; diff --git a/query/src/storages/fuse/operations/navigate.rs b/query/src/storages/fuse/operations/navigate.rs new file mode 100644 index 000000000000..df46900e505e --- /dev/null +++ b/query/src/storages/fuse/operations/navigate.rs @@ -0,0 +1,117 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use std::sync::Arc; + +use chrono::DateTime; +use chrono::Utc; +use common_exception::ErrorCode; +use common_exception::Result; +use common_meta_types::TableStatistics; +use futures::TryStreamExt; + +use crate::sessions::QueryContext; +use crate::sql::OPT_KEY_SNAPSHOT_LOCATION; +use crate::storages::fuse::io::MetaReaders; +use crate::storages::fuse::FuseTable; + +impl FuseTable { + pub async fn navigate( + &self, + ctx: &Arc, + time_point: DateTime, + ) -> Result> { + let snapshot_location = if let Some(loc) = self.snapshot_loc() { + loc + } else { + // not an error? + return Err(ErrorCode::TableHistoricalDataNotFound( + "Empty Table has no historical data", + )); + }; + + let snapshot_version = self.snapshot_format_version(); + let reader = MetaReaders::table_snapshot_reader(ctx); + + // grab the table history + // snapshots are order by timestamp DESC. + let mut snapshots = reader.snapshot_history( + snapshot_location, + snapshot_version, + self.meta_location_generator().clone(), + ); + + // Find the instant which matches ths given `time_point`. + let mut instant = None; + while let Some(snapshot) = snapshots.try_next().await? { + if let Some(ts) = snapshot.timestamp { + // break on the first one + if ts <= time_point { + instant = Some(snapshot); + break; + } + } + } + + if let Some(snapshot) = instant { + // Load the table instance by the snapshot + + // The `seq` of ident that we cloned here is JUST a place holder + // we should NOT use it other than a pure place holder. + // Fortunately, historical table should be read-only. + // - Although, caller of fuse table will not perform mutation on a historical table + // but in case there are careless mistakes, an extra attribute `read_only` is + // added the FuseTable, and during mutation operations, FuseTable will check it. + // - Figuring out better way... + let mut table_info = self.table_info.clone(); + + // There are more to be kept in snapshot, like engine_options, ordering keys... + // or we could just keep a clone of TableMeta in the snapshot. + // + // currently, here are what we can recovery from the snapshot: + + // 1. the table schema + table_info.meta.schema = Arc::new(snapshot.schema.clone()); + + // 2. the table option `snapshot_location` + let ver = snapshot.format_version(); + let loc = self + .meta_location_generator + .snapshot_location_from_uuid(&snapshot.snapshot_id, ver)?; + table_info + .meta + .options + .insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), loc); + + // 3. The statistics + let summary = &snapshot.summary; + table_info.meta.statistics = TableStatistics { + number_of_rows: summary.row_count, + data_bytes: summary.uncompressed_byte_size, + compressed_data_bytes: summary.compressed_byte_size, + index_data_bytes: 0, // we do not have it yet + }; + + // let's instantiate it + let read_only = true; + let fuse_tbl = FuseTable::do_create(table_info, read_only)?; + Ok(fuse_tbl.into()) + } else { + Err(ErrorCode::TableHistoricalDataNotFound( + "No historical data found", + )) + } + } +} diff --git a/query/src/storages/fuse/operations/read.rs b/query/src/storages/fuse/operations/read.rs index 0d8e2d12fb07..7da4b6d25c37 100644 --- a/query/src/storages/fuse/operations/read.rs +++ b/query/src/storages/fuse/operations/read.rs @@ -40,7 +40,7 @@ use crate::storages::fuse::FuseTable; impl FuseTable { #[inline] - pub async fn do_read( + pub fn do_read( &self, ctx: Arc, push_downs: &Option, diff --git a/query/src/storages/fuse/operations/truncate.rs b/query/src/storages/fuse/operations/truncate.rs index 347db1199bdd..881d0bad3d60 100644 --- a/query/src/storages/fuse/operations/truncate.rs +++ b/query/src/storages/fuse/operations/truncate.rs @@ -36,6 +36,7 @@ impl FuseTable { let new_snapshot = TableSnapshot::new( Uuid::new_v4(), + &prev_snapshot.timestamp, Some((prev_id, prev_snapshot.format_version())), prev_snapshot.schema.clone(), Default::default(), diff --git a/query/src/storages/storage_table.rs b/query/src/storages/storage_table.rs index f25f53518a68..9d2174d192cc 100644 --- a/query/src/storages/storage_table.rs +++ b/query/src/storages/storage_table.rs @@ -150,6 +150,7 @@ pub trait Table: Sync + Send { } } +#[derive(Debug)] pub struct TableStatistics { pub num_rows: Option, pub data_size: Option, diff --git a/query/tests/it/storages/fuse/meta/mod.rs b/query/tests/it/storages/fuse/meta/mod.rs new file mode 100644 index 000000000000..c0e4f37ca107 --- /dev/null +++ b/query/tests/it/storages/fuse/meta/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod snapshot; diff --git a/query/tests/it/storages/fuse/meta/snapshot.rs b/query/tests/it/storages/fuse/meta/snapshot.rs new file mode 100644 index 000000000000..d8485aaaff76 --- /dev/null +++ b/query/tests/it/storages/fuse/meta/snapshot.rs @@ -0,0 +1,72 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Add; + +use common_datavalues::DataSchema; +use databend_query::storages::fuse::meta::TableSnapshot; +use uuid::Uuid; + +fn default_snapshot() -> TableSnapshot { + let uuid = Uuid::new_v4(); + let schema = DataSchema::empty(); + let stats = Default::default(); + TableSnapshot::new(uuid, &None, None, schema, stats, vec![]) +} + +#[test] +fn snapshot_timestamp_is_some() { + let s = default_snapshot(); + assert!(s.timestamp.is_some()); +} + +#[test] +fn snapshot_timestamp_monotonic_increase() { + let prev = default_snapshot(); + let schema = DataSchema::empty(); + let uuid = Uuid::new_v4(); + let current = TableSnapshot::new( + uuid, + &prev.timestamp, + prev.prev_snapshot_id.clone(), + schema, + Default::default(), + vec![], + ); + let current_ts = current.timestamp.unwrap(); + let prev_ts = prev.timestamp.unwrap(); + assert!(current_ts > prev_ts) +} + +#[test] +fn snapshot_timestamp_time_skew_tolerance() { + let mut prev = default_snapshot(); + let schema = DataSchema::empty(); + let uuid = Uuid::new_v4(); + + // simulating a stalled clock + prev.timestamp = Some(prev.timestamp.unwrap().add(chrono::Duration::days(1))); + + let current = TableSnapshot::new( + uuid, + &prev.timestamp, + prev.prev_snapshot_id.clone(), + schema, + Default::default(), + vec![], + ); + let current_ts = current.timestamp.unwrap(); + let prev_ts = prev.timestamp.unwrap(); + assert!(current_ts > prev_ts) +} diff --git a/query/tests/it/storages/fuse/mod.rs b/query/tests/it/storages/fuse/mod.rs index 214bc76c069b..95ca306b7358 100644 --- a/query/tests/it/storages/fuse/mod.rs +++ b/query/tests/it/storages/fuse/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod io; +mod meta; mod operations; mod pruning; mod statistics; diff --git a/query/tests/it/storages/fuse/operations/mod.rs b/query/tests/it/storages/fuse/operations/mod.rs index d88fd42ecb5f..84079bf009a2 100644 --- a/query/tests/it/storages/fuse/operations/mod.rs +++ b/query/tests/it/storages/fuse/operations/mod.rs @@ -14,6 +14,7 @@ // mod commit; +mod navigate; mod optimize; mod purge_drop; mod purge_truncate; diff --git a/query/tests/it/storages/fuse/operations/navigate.rs b/query/tests/it/storages/fuse/operations/navigate.rs new file mode 100644 index 000000000000..8dff51550dc6 --- /dev/null +++ b/query/tests/it/storages/fuse/operations/navigate.rs @@ -0,0 +1,202 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use std::ops::Add; +use std::ops::Sub; +use std::sync::Arc; +use std::time::Duration; + +use common_base::base::tokio; +use common_datablocks::DataBlock; +use common_datavalues::DataSchema; +use common_exception::ErrorCode; +use common_exception::Result; +use common_planners::TruncateTablePlan; +use common_streams::DataBlockStream; +use databend_query::pipelines::new::NewPipeline; +use databend_query::storages::fuse::io::MetaReaders; +use databend_query::storages::fuse::io::TableMetaLocationGenerator; +use databend_query::storages::fuse::FuseTable; +use databend_query::storages::Table; +use futures::TryStreamExt; + +use crate::storages::fuse::table_test_fixture::execute_query; +use crate::storages::fuse::table_test_fixture::TestFixture; + +#[tokio::test] +async fn test_fuse_navigate() -> Result<()> { + // - perform two insertions, which will left 2 snapshots + // - navigate to the snapshot generated by the first insertion should be success + // - navigate to the snapshot that generated before the first insertion should fail + + // 1. Setup + let fixture = TestFixture::new().await; + let db = fixture.default_db_name(); + let tbl = fixture.default_table_name(); + let ctx = fixture.ctx(); + fixture.create_default_table().await?; + + // 1.1 first commit + let qry = format!("insert into '{}'.'{}' values (1), (2) ", db, tbl); + execute_query(ctx.clone(), qry.as_str()) + .await? + .try_collect::>() + .await?; + + // keep the first snapshot of the insertion + let table = fixture.latest_default_table().await?; + let first_snapshot = FuseTable::try_from_table(table.as_ref())? + .snapshot_loc() + .unwrap(); + + // take a nap + tokio::time::sleep(Duration::from_millis(2)).await; + + // 1.2 second commit + let qry = format!("insert into '{}'.'{}' values (3) ", db, tbl); + execute_query(ctx.clone(), qry.as_str()) + .await? + .try_collect::>() + .await?; + // keep the snapshot of the second insertion + let table = fixture.latest_default_table().await?; + let second_snapshot = FuseTable::try_from_table(table.as_ref())? + .snapshot_loc() + .unwrap(); + assert_ne!(second_snapshot, first_snapshot); + + // 2. grab the history + let table = fixture.latest_default_table().await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let reader = MetaReaders::table_snapshot_reader(ctx.as_ref()); + let loc = fuse_table.snapshot_loc().unwrap(); + assert_eq!(second_snapshot, loc); + let version = TableMetaLocationGenerator::snapshot_version(loc.as_str()); + let snapshots: Vec<_> = reader + .snapshot_history(loc, version, fuse_table.meta_location_generator().clone()) + .try_collect() + .await?; + + // 3. there should be two snapshots + assert_eq!(2, snapshots.len()); + eprintln!("snapshots {:?}", &snapshots); + + // 4. navigate to the first snapshot + // history is order by timestamp DESC + let latest = &snapshots[0]; + let instant = latest + .timestamp + .unwrap() + .sub(chrono::Duration::milliseconds(1)); + // navigate from the instant that is just one ms before the timestamp of the latest snapshot + let tbl = fuse_table.navigate(&ctx, instant).await?; + + // check we got the snapshot of the first insertion + assert_eq!(first_snapshot, tbl.snapshot_loc().unwrap()); + + // 4. navigate beyond the first snapshot + let first_insertion = &snapshots[1]; + let instant = first_insertion + .timestamp + .unwrap() + .sub(chrono::Duration::milliseconds(1)); + // navigate from the instant that is just one ms before the timestamp of the last insertion + let res = fuse_table.navigate(&ctx, instant).await; + match res { + Ok(_) => panic!("historical data should not exist"), + Err(e) => assert_eq!(e.code(), ErrorCode::table_historical_data_not_found_code()), + }; + Ok(()) +} + +#[tokio::test] +async fn test_fuse_historical_table_is_read_only() -> Result<()> { + // 1. Setup + let fixture = TestFixture::new().await; + let db = fixture.default_db_name(); + let tbl = fixture.default_table_name(); + let ctx = fixture.ctx(); + fixture.create_default_table().await?; + + let qry = format!("insert into '{}'.'{}' values (1)", db, tbl); + execute_query(ctx.clone(), qry.as_str()) + .await? + .try_collect::>() + .await?; + + // 2. grab the history + let table = fixture.latest_default_table().await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let loc = fuse_table.snapshot_loc().unwrap(); + let reader = MetaReaders::table_snapshot_reader(ctx.as_ref()); + let version = TableMetaLocationGenerator::snapshot_version(loc.as_str()); + let snapshots: Vec<_> = reader + .snapshot_history(loc, version, fuse_table.meta_location_generator().clone()) + .try_collect() + .await?; + + let snapshot = &snapshots[0]; + let instant = snapshot + .timestamp + .unwrap() + .add(chrono::Duration::milliseconds(1)); + let tbl = fuse_table.navigate(&ctx, instant).await?; + + // check append2 + let res = tbl.append2(ctx.clone(), &mut NewPipeline::create()); + assert_not_writable(res, "append2"); + let empty_stream = Box::pin(DataBlockStream::create( + Arc::new(DataSchema::empty()), + None, + vec![], + )); + + // check append_data + let res = tbl.append_data(ctx.clone(), empty_stream).await; + assert_not_writable(res, "append_data"); + + // check commit_insertion + let res = tbl.commit_insertion(ctx.clone(), "", vec![], false).await; + assert_not_writable(res, "commit_insertion"); + + // check truncate + let res = tbl + .truncate(ctx.clone(), TruncateTablePlan { + // values do not matter + catalog: "".to_string(), + db, + table: "".to_string(), + purge: false, + }) + .await; + assert_not_writable(res, "truncate"); + + Ok(()) +} + +fn assert_not_writable(res: Result, case_name: &str) { + match res { + Ok(_) => panic!( + "historical table should NOT be writable, case {}", + case_name + ), + Err(e) => assert_eq!( + e.code(), + ErrorCode::table_not_writable_code(), + " case {}", + case_name + ), + } +}