diff --git a/src/query/ast/src/ast/format/ast_format.rs b/src/query/ast/src/ast/format/ast_format.rs index ec7408f8b6332..a6e6f15289725 100644 --- a/src/query/ast/src/ast/format/ast_format.rs +++ b/src/query/ast/src/ast/format/ast_format.rs @@ -3292,6 +3292,7 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor { fn visit_stream_point(&mut self, point: &'ast StreamPoint) { match point { StreamPoint::AtStream { database, name } => self.visit_table_ref(&None, database, name), + StreamPoint::AtPoint(point) => self.visit_time_travel_point(point), } } diff --git a/src/query/ast/src/ast/format/syntax/ddl.rs b/src/query/ast/src/ast/format/syntax/ddl.rs index e3dcc844ca0bd..7096e3227193c 100644 --- a/src/query/ast/src/ast/format/syntax/ddl.rs +++ b/src/query/ast/src/ast/format/syntax/ddl.rs @@ -335,24 +335,27 @@ pub(crate) fn pretty_create_stream(stmt: CreateStreamStmt) -> RcDoc<'static> { .append(RcDoc::text(stmt.table.to_string())), ), ) - .append( - if let Some(StreamPoint::AtStream { database, name }) = stmt.stream_point { - RcDoc::space() - .append(RcDoc::text("AT (STREAM => ")) - .append( - RcDoc::space() - .append(if let Some(database) = database { - RcDoc::text(database.to_string()).append(RcDoc::text(".")) - } else { - RcDoc::nil() - }) - .append(RcDoc::text(name.to_string())), - ) - .append(RcDoc::text(")")) - } else { - RcDoc::nil() - }, - ) + .append(match stmt.stream_point { + Some(StreamPoint::AtPoint(TimeTravelPoint::Snapshot(sid))) => { + RcDoc::text(format!(" AT (SNAPSHOT => {sid})")) + } + Some(StreamPoint::AtPoint(TimeTravelPoint::Timestamp(ts))) => { + RcDoc::text(format!(" AT (TIMESTAMP => {ts})")) + } + Some(StreamPoint::AtStream { database, name }) => RcDoc::space() + .append(RcDoc::text("AT (STREAM => ")) + .append( + RcDoc::space() + .append(if let Some(database) = database { + RcDoc::text(database.to_string()).append(RcDoc::text(".")) + } else { + RcDoc::nil() + }) + .append(RcDoc::text(name.to_string())), + ) + .append(RcDoc::text(")")), + None => RcDoc::nil(), + }) .append(if !stmt.append_only { RcDoc::space().append(RcDoc::text("APPEND_ONLY = false")) } else { diff --git a/src/query/ast/src/ast/statements/stream.rs b/src/query/ast/src/ast/statements/stream.rs index 2e77547d286e1..f310e52195879 100644 --- a/src/query/ast/src/ast/statements/stream.rs +++ b/src/query/ast/src/ast/statements/stream.rs @@ -22,6 +22,7 @@ use derive_visitor::DriveMut; use crate::ast::write_dot_separated_list; use crate::ast::Identifier; use crate::ast::ShowLimit; +use crate::ast::TimeTravelPoint; #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub enum StreamPoint { @@ -29,6 +30,7 @@ pub enum StreamPoint { database: Option, name: Identifier, }, + AtPoint(TimeTravelPoint), } impl Display for StreamPoint { @@ -39,6 +41,7 @@ impl Display for StreamPoint { write_dot_separated_list(f, database.iter().chain(Some(name)))?; write!(f, ")") } + StreamPoint::AtPoint(point) => write!(f, " AT {}", point), } } } diff --git a/src/query/ast/src/ast/visitors/walk.rs b/src/query/ast/src/ast/visitors/walk.rs index 2604f7913c0e3..e920036b05291 100644 --- a/src/query/ast/src/ast/visitors/walk.rs +++ b/src/query/ast/src/ast/visitors/walk.rs @@ -322,6 +322,9 @@ pub fn walk_stream_point<'a, V: Visitor<'a>>(visitor: &mut V, point: &'a StreamP visitor.visit_identifier(name); } + StreamPoint::AtPoint(point) => { + visitor.visit_time_travel_point(point); + } } } diff --git a/src/query/ast/src/ast/visitors/walk_mut.rs b/src/query/ast/src/ast/visitors/walk_mut.rs index 73de0c485ba7b..f4766fa6383fa 100644 --- a/src/query/ast/src/ast/visitors/walk_mut.rs +++ b/src/query/ast/src/ast/visitors/walk_mut.rs @@ -346,6 +346,7 @@ pub fn walk_stream_point_mut(visitor: &mut V, stream: &mut Stream visitor.visit_identifier(name); } + StreamPoint::AtPoint(point) => visitor.visit_time_travel_point(point), } } diff --git a/src/query/ast/src/parser/stream.rs b/src/query/ast/src/parser/stream.rs index 3affceb9510fa..b5cb77bfc4712 100644 --- a/src/query/ast/src/parser/stream.rs +++ b/src/query/ast/src/parser/stream.rs @@ -14,7 +14,6 @@ use nom::combinator::map; -use super::statement::parse_create_option; use crate::ast::CreateStreamStmt; use crate::ast::DescribeStreamStmt; use crate::ast::DropStreamStmt; @@ -27,6 +26,8 @@ use crate::parser::common::map_res; use crate::parser::common::IResult; use crate::parser::expr::literal_bool; use crate::parser::expr::literal_string; +use crate::parser::query::travel_point; +use crate::parser::statement::parse_create_option; use crate::parser::statement::show_limit; use crate::parser::token::TokenKind::*; use crate::parser::Input; @@ -100,12 +101,16 @@ fn drop_stream(i: Input) -> IResult { } fn stream_point(i: Input) -> IResult { - let mut at_stream = map( + let at_stream = map( rule! { AT ~ "(" ~ STREAM ~ "=>" ~ #dot_separated_idents_1_to_2 ~ ")" }, |(_, _, _, _, (database, name), _)| StreamPoint::AtStream { database, name }, ); + let at_point = map(rule! { AT ~ ^#travel_point }, |(_, travel_point)| { + StreamPoint::AtPoint(travel_point) + }); + rule!( - #at_stream + #at_stream | #at_point )(i) } diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index 70f11276058e4..5e0905a57776a 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -162,6 +162,8 @@ fn test_statement() { r#"show full views from ctl.db"#, r#"create stream test2.s1 on table test.t append_only = false;"#, r#"create stream if not exists test2.s2 on table test.t at (stream => test1.s1) comment = 'this is a stream';"#, + r#"create stream if not exists test2.s3 on table test.t at (TIMESTAMP => '2023-06-26 09:49:02.038483'::TIMESTAMP) append_only = false;"#, + r#"create stream if not exists test2.s3 on table test.t at (SNAPSHOT => '9828b23f74664ff3806f44bbc1925ea5') append_only = true;"#, r#"create or replace stream test2.s1 on table test.t append_only = false;"#, r#"show full streams from default.test2 like 's%';"#, r#"describe stream test2.s2;"#, diff --git a/src/query/ast/tests/it/testdata/statement.txt b/src/query/ast/tests/it/testdata/statement.txt index bfd0defca2310..5c6dc4f82b976 100644 --- a/src/query/ast/tests/it/testdata/statement.txt +++ b/src/query/ast/tests/it/testdata/statement.txt @@ -3692,6 +3692,136 @@ CreateStream( ) +---------- Input ---------- +create stream if not exists test2.s3 on table test.t at (TIMESTAMP => '2023-06-26 09:49:02.038483'::TIMESTAMP) append_only = false; +---------- Output --------- +CREATE STREAM IF NOT EXISTS test2.s3 ON TABLE test.t AT (TIMESTAMP => '2023-06-26 09:49:02.038483'::TIMESTAMP) APPEND_ONLY = false +---------- AST ------------ +CreateStream( + CreateStreamStmt { + create_option: CreateIfNotExists, + catalog: None, + database: Some( + Identifier { + span: Some( + 28..33, + ), + name: "test2", + quote: None, + is_hole: false, + }, + ), + stream: Identifier { + span: Some( + 34..36, + ), + name: "s3", + quote: None, + is_hole: false, + }, + table_database: Some( + Identifier { + span: Some( + 46..50, + ), + name: "test", + quote: None, + is_hole: false, + }, + ), + table: Identifier { + span: Some( + 51..52, + ), + name: "t", + quote: None, + is_hole: false, + }, + stream_point: Some( + AtPoint( + Timestamp( + Cast { + span: Some( + 98..109, + ), + expr: Literal { + span: Some( + 70..98, + ), + value: String( + "2023-06-26 09:49:02.038483", + ), + }, + target_type: Timestamp, + pg_style: true, + }, + ), + ), + ), + append_only: false, + comment: None, + }, +) + + +---------- Input ---------- +create stream if not exists test2.s3 on table test.t at (SNAPSHOT => '9828b23f74664ff3806f44bbc1925ea5') append_only = true; +---------- Output --------- +CREATE STREAM IF NOT EXISTS test2.s3 ON TABLE test.t AT (SNAPSHOT => '9828b23f74664ff3806f44bbc1925ea5') +---------- AST ------------ +CreateStream( + CreateStreamStmt { + create_option: CreateIfNotExists, + catalog: None, + database: Some( + Identifier { + span: Some( + 28..33, + ), + name: "test2", + quote: None, + is_hole: false, + }, + ), + stream: Identifier { + span: Some( + 34..36, + ), + name: "s3", + quote: None, + is_hole: false, + }, + table_database: Some( + Identifier { + span: Some( + 46..50, + ), + name: "test", + quote: None, + is_hole: false, + }, + ), + table: Identifier { + span: Some( + 51..52, + ), + name: "t", + quote: None, + is_hole: false, + }, + stream_point: Some( + AtPoint( + Snapshot( + "9828b23f74664ff3806f44bbc1925ea5", + ), + ), + ), + append_only: true, + comment: None, + }, +) + + ---------- Input ---------- create or replace stream test2.s1 on table test.t append_only = false; ---------- Output --------- diff --git a/src/query/ee/src/inverted_index/indexer.rs b/src/query/ee/src/inverted_index/indexer.rs index f3f54cfe71f3c..dc32d349498b7 100644 --- a/src/query/ee/src/inverted_index/indexer.rs +++ b/src/query/ee/src/inverted_index/indexer.rs @@ -189,7 +189,10 @@ impl Indexer { write_data(index_bytes, operator, &new_index_info_loc).await?; // Generate new table snapshot file - let mut new_snapshot = TableSnapshot::from_previous(snapshot.as_ref()); + let mut new_snapshot = TableSnapshot::from_previous( + snapshot.as_ref(), + Some(fuse_table.get_table_info().ident.seq), + ); let mut index_info_locations = BTreeMap::new(); if let Some(old_index_info_locations) = &snapshot.index_info_locations { for (old_index_name, location) in old_index_info_locations { diff --git a/src/query/ee/src/stream/handler.rs b/src/query/ee/src/stream/handler.rs index b450ab376fe4d..5cd2c2ade4126 100644 --- a/src/query/ee/src/stream/handler.rs +++ b/src/query/ee/src/stream/handler.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use databend_common_base::base::GlobalInstance; use databend_common_catalog::table::Table; +use databend_common_catalog::table::TableExt; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::schema::CreateTableReply; @@ -31,6 +32,7 @@ use databend_common_meta_types::MatchSeq; use databend_common_sql::plans::CreateStreamPlan; use databend_common_sql::plans::DropStreamPlan; use databend_common_sql::plans::StreamNavigation; +use databend_common_storages_fuse::io::SnapshotsIO; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; use databend_common_storages_stream::stream_table::StreamTable; @@ -61,7 +63,7 @@ impl StreamHandler for RealStreamHandler { let tenant = ctx.get_tenant(); let catalog = ctx.get_catalog(&plan.catalog).await?; - let table = catalog + let mut table = catalog .get_table(tenant.as_str(), &plan.table_database, &plan.table_name) .await?; let table_info = table.get_table_info(); @@ -72,7 +74,7 @@ impl StreamHandler for RealStreamHandler { ))); } - let table_version = table_info.ident.seq; + let mut table_version = table_info.ident.seq; let table_id = table_info.ident.table_id; let schema = table_info.schema().clone(); if !table.change_tracking_enabled() { @@ -95,10 +97,12 @@ impl StreamHandler for RealStreamHandler { catalog .upsert_table_option(tenant.as_str(), &plan.table_database, req) .await?; + // refreash table. + table = table.refresh(ctx.as_ref()).await?; + table_version = table.get_table_info().ident.seq; } - let mut options = BTreeMap::new(); - match &plan.navigation { + let (version, snapshot_location) = match &plan.navigation { Some(StreamNavigation::AtStream { database, name }) => { let stream = catalog.get_table(tenant.as_str(), database, name).await?; let stream = StreamTable::try_from_table(stream.as_ref())?; @@ -122,34 +126,70 @@ impl StreamHandler for RealStreamHandler { plan.table_database, plan.table_name ))); } - options = stream.get_table_info().options().clone(); - let stream_mode = if plan.append_only { - MODE_APPEND_ONLY + + let version = stream_opts + .get(OPT_KEY_TABLE_VER) + .ok_or_else(|| ErrorCode::IllegalStream(format!("Illegal stream '{name}'")))? + .parse::()?; + (version, stream_opts.get(OPT_KEY_SNAPSHOT_LOCATION).cloned()) + } + Some(StreamNavigation::AtPoint(point)) => { + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let source = fuse_table.navigate_to(point).await?; + if let Some(snapshot_loc) = source.snapshot_loc().await? { + let (snapshot, _) = + SnapshotsIO::read_snapshot(snapshot_loc.clone(), fuse_table.get_operator()) + .await?; + let Some(version) = snapshot.prev_table_seq else { + return Err(ErrorCode::IllegalStream( + "The stream navigation at point has not table version".to_string(), + )); + }; + + // The table version is the version of the table when the snapshot was created. + // We need make sure the version greater than the table version, + // and less equal than the table version after the snapshot commit. + (version + 1, Some(snapshot_loc)) } else { - MODE_STANDARD - }; - options.insert(OPT_KEY_MODE.to_string(), stream_mode.to_string()); + unreachable!() + } } None => { - let stream_mode = if plan.append_only { - MODE_APPEND_ONLY - } else { - MODE_STANDARD - }; - options.insert(OPT_KEY_MODE.to_string(), stream_mode.to_string()); - options.insert(OPT_KEY_TABLE_NAME.to_string(), plan.table_name.clone()); - options.insert( - OPT_KEY_DATABASE_NAME.to_string(), - plan.table_database.clone(), - ); - options.insert(OPT_KEY_TABLE_ID.to_string(), table_id.to_string()); - options.insert(OPT_KEY_TABLE_VER.to_string(), table_version.to_string()); let fuse_table = FuseTable::try_from_table(table.as_ref())?; - if let Some(snapshot_loc) = fuse_table.snapshot_loc().await? { - options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc); - } + (table_version, fuse_table.snapshot_loc().await?) + } + }; + + if let Some(value) = table + .get_table_info() + .options() + .get(OPT_KEY_CHANGE_TRACKING_BEGIN_VER) + { + let begin_version = value.parse::()?; + if begin_version > version { + return Err(ErrorCode::IllegalStream(format!( + "Change tracking has been missing for the time range requested on table '{}.{}'", + plan.table_database, plan.table_name + ))); } } + let mut options = BTreeMap::new(); + let stream_mode = if plan.append_only { + MODE_APPEND_ONLY + } else { + MODE_STANDARD + }; + options.insert(OPT_KEY_MODE.to_string(), stream_mode.to_string()); + options.insert(OPT_KEY_TABLE_NAME.to_string(), plan.table_name.clone()); + options.insert( + OPT_KEY_DATABASE_NAME.to_string(), + plan.table_database.clone(), + ); + options.insert(OPT_KEY_TABLE_ID.to_string(), table_id.to_string()); + options.insert(OPT_KEY_TABLE_VER.to_string(), version.to_string()); + if let Some(snapshot_loc) = snapshot_location { + options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc); + } let req = CreateTableReq { create_option: plan.create_option, diff --git a/src/query/ee/tests/it/main.rs b/src/query/ee/tests/it/main.rs index a8804192c4876..9bb0207fd13c6 100644 --- a/src/query/ee/tests/it/main.rs +++ b/src/query/ee/tests/it/main.rs @@ -18,3 +18,4 @@ mod background_service; mod inverted_index; mod license; mod storages; +mod stream; diff --git a/src/query/ee/tests/it/stream/mod.rs b/src/query/ee/tests/it/stream/mod.rs new file mode 100644 index 0000000000000..175d432713301 --- /dev/null +++ b/src/query/ee/tests/it/stream/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Databend Cloud +// +// Licensed under the Elastic License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.elastic.co/licensing/elastic-license +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod stream_create; diff --git a/src/query/ee/tests/it/stream/stream_create.rs b/src/query/ee/tests/it/stream/stream_create.rs new file mode 100644 index 0000000000000..fa15f1b794e1a --- /dev/null +++ b/src/query/ee/tests/it/stream/stream_create.rs @@ -0,0 +1,76 @@ +// Copyright 2023 Databend Cloud +// +// Licensed under the Elastic License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.elastic.co/licensing/elastic-license +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::Duration; +use chrono::Utc; +use databend_common_base::base::tokio; +use databend_common_exception::Result; +use databend_enterprise_query::test_kits::context::EESetup; +use databend_query::test_kits::generate_snapshots; +use databend_query::test_kits::TestFixture; + +#[tokio::test(flavor = "multi_thread")] +async fn test_stream_create() -> Result<()> { + let fixture = TestFixture::setup_with_custom(EESetup::new()).await?; + fixture.create_default_database().await?; + fixture.create_normal_table().await?; + + let db = fixture.default_db_name(); + let tbl = fixture.default_table_name(); + { + let qry = format!( + "alter table {}.{} set options(change_tracking=true)", + db, tbl + ); + let r = fixture.execute_command(&qry).await; + assert!(r.is_ok()); + } + + generate_snapshots(&fixture).await?; + let now = Utc::now(); + { + let time_point = now - Duration::hours(14); + let qry = format!( + "create stream s on table {}.{} at(timestamp => '{:?}'::TIMESTAMP)", + db, tbl, time_point + ); + let r = fixture.execute_command(&qry).await; + assert!(r.is_err()); + let expect = "TableHistoricalDataNotFound. Code: 2013, Text = No historical data found at given point."; + assert_eq!(expect, format!("{}", r.unwrap_err())); + } + + { + let time_point = now - Duration::hours(12); + let qry = format!( + "create stream s on table {}.{} at(timestamp => '{:?}'::TIMESTAMP)", + db, tbl, time_point + ); + let r = fixture.execute_command(&qry).await; + assert!(r.is_err()); + let expect = "IllegalStream. Code: 2733, Text = The stream navigation at point has not table version."; + assert_eq!(expect, format!("{}", r.unwrap_err())); + } + + { + let qry = format!( + "create stream s on table {}.{} append_only = false", + db, tbl + ); + let r = fixture.execute_command(&qry).await; + assert!(r.is_ok()); + } + + Ok(()) +} diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 2741d6518e444..39e1806e364d5 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -418,6 +418,7 @@ impl MergeIntoInterpreter { let base_snapshot = fuse_table.read_table_snapshot().await?.unwrap_or_else(|| { Arc::new(TableSnapshot::new_empty_snapshot( fuse_table.schema().as_ref().clone(), + Some(table_info.ident.seq), )) }); diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index da6cfacb3e538..4047ffe68e732 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -162,7 +162,10 @@ impl ReplaceInterpreter { let table_info = fuse_table.get_table_info(); let base_snapshot = fuse_table.read_table_snapshot().await?.unwrap_or_else(|| { - Arc::new(TableSnapshot::new_empty_snapshot(schema.as_ref().clone())) + Arc::new(TableSnapshot::new_empty_snapshot( + schema.as_ref().clone(), + Some(table_info.ident.seq), + )) }); let is_multi_node = !self.ctx.get_cluster().is_empty(); diff --git a/src/query/service/src/interpreters/interpreter_table_add_column.rs b/src/query/service/src/interpreters/interpreter_table_add_column.rs index 607a065867b67..2f06b1c7988bb 100644 --- a/src/query/service/src/interpreters/interpreter_table_add_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_add_column.rs @@ -153,7 +153,10 @@ pub(crate) async fn generate_new_snapshot( ) -> Result<()> { if let Ok(fuse_table) = FuseTable::try_from_table(table) { if let Some(snapshot) = fuse_table.read_table_snapshot().await? { - let mut new_snapshot = TableSnapshot::from_previous(snapshot.as_ref()); + let mut new_snapshot = TableSnapshot::from_previous( + snapshot.as_ref(), + Some(fuse_table.get_table_info().ident.seq), + ); // replace schema new_snapshot.schema = new_table_meta.schema.as_ref().clone(); diff --git a/src/query/service/src/test_kits/fuse.rs b/src/query/service/src/test_kits/fuse.rs index 1adb23368aeda..697a495ca9a95 100644 --- a/src/query/service/src/test_kits/fuse.rs +++ b/src/query/service/src/test_kits/fuse.rs @@ -66,7 +66,10 @@ pub async fn generate_snapshot_with_segments( let current_snapshot = fuse_table.read_table_snapshot().await?.unwrap(); let operator = fuse_table.get_operator(); let location_gen = fuse_table.meta_location_generator(); - let mut new_snapshot = TableSnapshot::from_previous(current_snapshot.as_ref()); + let mut new_snapshot = TableSnapshot::from_previous( + current_snapshot.as_ref(), + Some(fuse_table.get_table_info().ident.seq), + ); new_snapshot.segments = segment_locations; let new_snapshot_location = location_gen .snapshot_location_from_uuid(&new_snapshot.snapshot_id, TableSnapshot::VERSION)?; @@ -201,6 +204,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> { let locations = vec![segments_v3[0].0.clone(), segments_v2[0].0.clone()]; let mut snapshot_1 = TableSnapshot::new( Uuid::new_v4(), + None, &snapshot_0.timestamp, Some((snapshot_0.snapshot_id, TableSnapshotV2::VERSION)), schema.as_ref().clone(), @@ -224,7 +228,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> { segments_v3[0].0.clone(), segments_v2[0].0.clone(), ]; - let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1); + let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1, None); snapshot_2.segments = locations; snapshot_2.timestamp = Some(now); snapshot_2.summary = merge_statistics(&snapshot_1.summary, &segments_v3[1].1.summary, None); diff --git a/src/query/service/tests/it/storages/fuse/conflict.rs b/src/query/service/tests/it/storages/fuse/conflict.rs index 5e2732b26f648..efe9274dde066 100644 --- a/src/query/service/tests/it/storages/fuse/conflict.rs +++ b/src/query/service/tests/it/storages/fuse/conflict.rs @@ -35,14 +35,14 @@ use databend_storages_common_table_meta::meta::TableSnapshot; /// /// so the delete operation cannot be applied fn test_unresolvable_delete_conflict() { - let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); + let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default(), None); base_snapshot.segments = vec![ ("1".to_string(), 1), ("2".to_string(), 1), ("3".to_string(), 1), ]; - let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); + let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default(), None); latest_snapshot.segments = vec![("1".to_string(), 1), ("4".to_string(), 1)]; let ctx = ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges { @@ -60,6 +60,7 @@ fn test_unresolvable_delete_conflict() { TableSchema::default(), None, Some(Arc::new(latest_snapshot)), + None, ); assert!(result.is_err()); } @@ -75,7 +76,7 @@ fn test_unresolvable_delete_conflict() { /// /// the delete operation is merged into the latest snapshot, by removing segments 2, 3, and adding segment 8 in the latest snapshot fn test_resolvable_delete_conflict() { - let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); + let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default(), None); base_snapshot.segments = vec![ ("1".to_string(), 1), ("2".to_string(), 1), @@ -93,7 +94,7 @@ fn test_resolvable_delete_conflict() { cluster_stats: None, }; - let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); + let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default(), None); latest_snapshot.segments = vec![ ("2".to_string(), 1), ("3".to_string(), 1), @@ -148,6 +149,7 @@ fn test_resolvable_delete_conflict() { TableSchema::default(), None, Some(Arc::new(latest_snapshot)), + None, ); let snapshot = result.unwrap(); let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)]; @@ -178,7 +180,7 @@ fn test_resolvable_delete_conflict() { /// /// the replace operation is merged into the latest snapshot, by removing segments 2, 3, and adding segment 6,5 in the latest snapshot fn test_resolvable_replace_conflict() { - let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); + let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default(), None); base_snapshot.segments = vec![ ("1".to_string(), 1), ("2".to_string(), 1), @@ -196,7 +198,7 @@ fn test_resolvable_replace_conflict() { cluster_stats: None, }; - let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); + let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default(), None); latest_snapshot.segments = vec![ ("2".to_string(), 1), ("3".to_string(), 1), @@ -251,6 +253,7 @@ fn test_resolvable_replace_conflict() { TableSchema::default(), None, Some(Arc::new(latest_snapshot)), + None, ); let snapshot = result.unwrap(); let expected = vec![ diff --git a/src/query/service/tests/it/storages/fuse/meta/snapshot.rs b/src/query/service/tests/it/storages/fuse/meta/snapshot.rs index 0fe4cd28fccf4..b7016946c74fb 100644 --- a/src/query/service/tests/it/storages/fuse/meta/snapshot.rs +++ b/src/query/service/tests/it/storages/fuse/meta/snapshot.rs @@ -26,7 +26,18 @@ fn default_snapshot() -> TableSnapshot { let uuid = Uuid::new_v4(); let schema = TableSchema::empty(); let stats = Default::default(); - TableSnapshot::new(uuid, &None, None, schema, stats, vec![], None, None, None) + TableSnapshot::new( + uuid, + None, + &None, + None, + schema, + stats, + vec![], + None, + None, + None, + ) } #[test] @@ -42,6 +53,7 @@ fn snapshot_timestamp_monotonic_increase() { let uuid = Uuid::new_v4(); let current = TableSnapshot::new( uuid, + None, &prev.timestamp, prev.prev_snapshot_id, schema, @@ -67,6 +79,7 @@ fn snapshot_timestamp_time_skew_tolerance() { let current = TableSnapshot::new( uuid, + None, &prev.timestamp, prev.prev_snapshot_id, schema, diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 342890f4cd699..55f039fecb2cf 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -261,6 +261,7 @@ async fn test_commit_to_meta_server() -> Result<()> { let new_segments = vec![("do not care".to_string(), SegmentInfo::VERSION)]; let new_snapshot = TableSnapshot::new( Uuid::new_v4(), + None, &None, None, table.schema().as_ref().clone(), diff --git a/src/query/service/tests/it/storages/fuse/operations/gc.rs b/src/query/service/tests/it/storages/fuse/operations/gc.rs index 45f6e516ddfc6..1948bcd15c86e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/gc.rs +++ b/src/query/service/tests/it/storages/fuse/operations/gc.rs @@ -91,7 +91,7 @@ async fn test_fuse_purge_normal_orphan_snapshot() -> Result<()> { .snapshot_location_from_uuid(&orphan_snapshot_id, TableSnapshot::VERSION)?; // orphan_snapshot is created by using `from_previous`, which guarantees // that the timestamp of snapshot returned is larger than `current_snapshot`'s. - let orphan_snapshot = TableSnapshot::from_previous(current_snapshot.as_ref()); + let orphan_snapshot = TableSnapshot::from_previous(current_snapshot.as_ref(), None); orphan_snapshot .write_meta(&operator, &orphan_snapshot_location) .await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index 02f56f2168cf3..84eee36d073d2 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -207,6 +207,7 @@ async fn test_safety() -> Result<()> { let id = Uuid::new_v4(); let snapshot = TableSnapshot::new( id, + None, &None, None, schema.as_ref().clone(), diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index ca87f7bcd8eed..7e0d9a20ab926 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -121,7 +121,7 @@ async fn test_recluster_mutator_block_select() -> Result<()> { test_segment_locations.push(segment_location); test_block_locations.push(block_location); // unused snapshot. - let snapshot = TableSnapshot::new_empty_snapshot(schema.as_ref().clone()); + let snapshot = TableSnapshot::new_empty_snapshot(schema.as_ref().clone(), None); let ctx: Arc = ctx.clone(); let segment_locations = create_segment_location_vector(test_segment_locations, None); @@ -218,6 +218,7 @@ async fn test_safety_for_recluster() -> Result<()> { let id = Uuid::new_v4(); let snapshot = Arc::new(TableSnapshot::new( id, + None, &None, None, schema.as_ref().clone(), diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 4ca94fa9af12e..189ed3283669a 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -606,7 +606,7 @@ impl<'a> Binder { } // Streams - Statement::CreateStream(stmt) => self.bind_create_stream(stmt).await?, + Statement::CreateStream(stmt) => self.bind_create_stream(bind_context, stmt).await?, Statement::DropStream(stmt) => self.bind_drop_stream(stmt).await?, Statement::ShowStreams(stmt) => self.bind_show_streams(bind_context, stmt).await?, Statement::DescribeStream(stmt) => self.bind_describe_stream(bind_context, stmt).await?, diff --git a/src/query/sql/src/planner/binder/ddl/stream.rs b/src/query/sql/src/planner/binder/ddl/stream.rs index a43882f5beae9..cb24311ff7222 100644 --- a/src/query/sql/src/planner/binder/ddl/stream.rs +++ b/src/query/sql/src/planner/binder/ddl/stream.rs @@ -37,6 +37,7 @@ impl Binder { #[async_backtrace::framed] pub(in crate::planner::binder) async fn bind_create_stream( &mut self, + bind_context: &mut BindContext, stmt: &CreateStreamStmt, ) -> Result { let CreateStreamStmt { @@ -61,16 +62,21 @@ impl Binder { .unwrap_or_else(|| self.ctx.get_current_database()); let table_name = normalize_identifier(table, &self.name_resolution_ctx).name; - let navigation = stream_point.as_ref().map(|point| match point { - StreamPoint::AtStream { database, name } => { + let navigation = match stream_point { + Some(StreamPoint::AtStream { database, name }) => { let database = database .as_ref() .map(|ident| normalize_identifier(ident, &self.name_resolution_ctx).name) .unwrap_or_else(|| self.ctx.get_current_database()); let name = normalize_identifier(name, &self.name_resolution_ctx).name; - StreamNavigation::AtStream { database, name } + Some(StreamNavigation::AtStream { database, name }) + } + Some(StreamPoint::AtPoint(point)) => { + let point = self.resolve_data_travel_point(bind_context, point).await?; + Some(StreamNavigation::AtPoint(point)) } - }); + None => None, + }; let plan = CreateStreamPlan { create_option: *create_option, diff --git a/src/query/sql/src/planner/plans/ddl/stream.rs b/src/query/sql/src/planner/plans/ddl/stream.rs index 4fda39163cea0..76e34212a5544 100644 --- a/src/query/sql/src/planner/plans/ddl/stream.rs +++ b/src/query/sql/src/planner/plans/ddl/stream.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_catalog::table::NavigationPoint; use databend_common_meta_app::schema::CreateOption; #[derive(Debug, Clone, Eq, PartialEq)] pub enum StreamNavigation { AtStream { database: String, name: String }, + AtPoint(NavigationPoint), } #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs index 23625825c6378..6b5428244227a 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs @@ -66,6 +66,9 @@ pub struct TableSnapshot { // for backward compatibility, `Option` is used pub timestamp: Option>, + // The table seq before snapshot commit. + pub prev_table_seq: Option, + /// previous snapshot pub prev_snapshot_id: Option<(SnapshotId, FormatVersion)>, @@ -92,6 +95,7 @@ pub struct TableSnapshot { impl TableSnapshot { pub fn new( snapshot_id: SnapshotId, + prev_table_seq: Option, prev_timestamp: &Option>, prev_snapshot_id: Option<(SnapshotId, FormatVersion)>, schema: TableSchema, @@ -113,6 +117,7 @@ impl TableSnapshot { format_version: TableSnapshot::VERSION, snapshot_id, timestamp, + prev_table_seq, prev_snapshot_id, schema, summary, @@ -123,9 +128,10 @@ impl TableSnapshot { } } - pub fn new_empty_snapshot(schema: TableSchema) -> Self { + pub fn new_empty_snapshot(schema: TableSchema, prev_table_seq: Option) -> Self { Self::new( Uuid::new_v4(), + prev_table_seq, &None, None, schema, @@ -137,12 +143,13 @@ impl TableSnapshot { ) } - pub fn from_previous(previous: &TableSnapshot) -> Self { + pub fn from_previous(previous: &TableSnapshot, prev_table_seq: Option) -> Self { let id = Uuid::new_v4(); let clone = previous.clone(); // the timestamp of the new snapshot will be adjusted by the `new` method Self::new( id, + prev_table_seq, &clone.timestamp, Some((clone.snapshot_id, clone.format_version)), clone.schema, @@ -223,6 +230,7 @@ impl From for TableSnapshot { format_version: s.format_version, snapshot_id: s.snapshot_id, timestamp: s.timestamp, + prev_table_seq: None, prev_snapshot_id: s.prev_snapshot_id, schema: s.schema, summary: s.summary, @@ -245,6 +253,7 @@ where T: Into format_version: s.format_version, snapshot_id: s.snapshot_id, timestamp: s.timestamp, + prev_table_seq: None, prev_snapshot_id: s.prev_snapshot_id, schema: s.schema.into(), summary: s.summary.into(), diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 9e6dfb0084bc0..4339eaa724bcb 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -562,8 +562,11 @@ impl Table for FuseTable { (FuseStatistics::default(), vec![]) }; + let table_version = Some(self.get_table_info().ident.seq); + let new_snapshot = TableSnapshot::new( Uuid::new_v4(), + table_version, &prev_timestamp, prev_snapshot_id, schema, @@ -615,8 +618,11 @@ impl Table for FuseTable { (FuseStatistics::default(), vec![]) }; + let table_version = Some(self.get_table_info().ident.seq); + let new_snapshot = TableSnapshot::new( Uuid::new_v4(), + table_version, &prev_timestamp, prev_snapshot_id, schema, diff --git a/src/query/storages/fuse/src/io/write/meta_writer.rs b/src/query/storages/fuse/src/io/write/meta_writer.rs index cfe8dbe00ed4a..2f0eca2687517 100644 --- a/src/query/storages/fuse/src/io/write/meta_writer.rs +++ b/src/query/storages/fuse/src/io/write/meta_writer.rs @@ -136,6 +136,7 @@ mod tests { let r = catch_unwind(|| { let mut snapshot = TableSnapshot::new( SnapshotId::new_v4(), + None, &None, None, TableSchema::default(), @@ -154,6 +155,7 @@ mod tests { // current version allowed let snapshot = TableSnapshot::new( SnapshotId::new_v4(), + None, &None, None, TableSchema::default(), diff --git a/src/query/storages/fuse/src/operations/analyze.rs b/src/query/storages/fuse/src/operations/analyze.rs index a8a5979ffe263..134d3a7da2b63 100644 --- a/src/query/storages/fuse/src/operations/analyze.rs +++ b/src/query/storages/fuse/src/operations/analyze.rs @@ -175,7 +175,8 @@ impl SinkAnalyzeState { let (col_stats, cluster_stats) = regenerate_statistics(table, snapshot.as_ref(), &self.ctx).await?; // 4. Save table statistics - let mut new_snapshot = TableSnapshot::from_previous(&snapshot); + let mut new_snapshot = + TableSnapshot::from_previous(&snapshot, Some(table.get_table_info().ident.seq)); new_snapshot.summary.col_stats = col_stats; new_snapshot.summary.cluster_stats = cluster_stats; new_snapshot.table_statistics_location = Some(table_statistics_location); diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 82db9fb58c594..ce682c4c022c1 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -310,8 +310,10 @@ impl FuseTable { ctx.set_status_info("mutation: begin try to commit"); loop { - let mut snapshot_tobe_committed = - TableSnapshot::from_previous(latest_snapshot.as_ref()); + let mut snapshot_tobe_committed = TableSnapshot::from_previous( + latest_snapshot.as_ref(), + Some(latest_table_info.ident.seq), + ); let schema = self.schema(); let (segments_tobe_committed, statistics_tobe_committed) = Self::merge_with_base( diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index fdebfd74262fa..89558aa1082e0 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -117,6 +117,7 @@ impl SnapshotGenerator for AppendGenerator { schema: TableSchema, cluster_key_meta: Option, previous: Option>, + prev_table_seq: Option, ) -> Result { let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?; if is_column_type_modified(&schema, expected_schema) { @@ -213,6 +214,7 @@ impl SnapshotGenerator for AppendGenerator { Ok(TableSnapshot::new( Uuid::new_v4(), + prev_table_seq, &prev_timestamp, prev_snapshot_id, schema, diff --git a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs index 52434b568af74..1a1a12303ca60 100644 --- a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs @@ -58,11 +58,16 @@ impl SnapshotGenerator for MutationGenerator { schema: TableSchema, cluster_key_meta: Option, previous: Option>, + prev_table_seq: Option, ) -> Result { let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0); - let previous = - previous.unwrap_or_else(|| Arc::new(TableSnapshot::new_empty_snapshot(schema.clone()))); + let previous = previous.unwrap_or_else(|| { + Arc::new(TableSnapshot::new_empty_snapshot( + schema.clone(), + prev_table_seq, + )) + }); match &self.conflict_resolve_ctx { ConflictResolveContext::ModifiedSegmentExistsInLatest(ctx) => { if let Some((removed, replaced)) = @@ -89,6 +94,7 @@ impl SnapshotGenerator for MutationGenerator { deduct_statistics_mut(&mut new_summary, &ctx.removed_statistics); let new_snapshot = TableSnapshot::new( Uuid::new_v4(), + prev_table_seq, &previous.timestamp, Some((previous.snapshot_id, previous.format_version)), schema, diff --git a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs index ebe1a5645023b..440f2cac3b15f 100644 --- a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs @@ -42,5 +42,6 @@ pub trait SnapshotGenerator { schema: TableSchema, cluster_key_meta: Option, previous: Option>, + prev_table_seq: Option, ) -> Result; } diff --git a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs index 352c978b22e73..0f5021db14a59 100644 --- a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs @@ -59,6 +59,7 @@ impl SnapshotGenerator for TruncateGenerator { schema: TableSchema, cluster_key_meta: Option, previous: Option>, + prev_table_seq: Option, ) -> Result { let (prev_timestamp, prev_snapshot_id) = if let Some(prev_snapshot) = previous { ( @@ -71,6 +72,7 @@ impl SnapshotGenerator for TruncateGenerator { let new_snapshot = TableSnapshot::new( Uuid::new_v4(), + prev_table_seq, &prev_timestamp, prev_snapshot_id, schema, diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 497aba039f2e4..6061b44dfeb3b 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -270,6 +270,7 @@ where F: SnapshotGenerator + Send + 'static schema, cluster_key_meta, previous, + Some(table_info.ident.seq), ) { Ok(snapshot) => { self.state = State::TryCommit { diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index 2d5955c1b3331..a13054ef48a72 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -162,14 +162,14 @@ impl StreamTable { .options() .get(OPT_KEY_CHANGE_TRACKING_BEGIN_VER) { - let begin_version = value.to_lowercase().parse::()?; + let begin_version = value.parse::()?; if begin_version > self.table_version { return Err(ErrorCode::IllegalStream(format!( "Change tracking has been missing for the time range requested on table '{}.{}'", self.table_database, self.table_name ))); } - }; + } Ok(table) } diff --git a/tests/suites/5_ee/05_stream/05_0002_ee_stream_create.result b/tests/suites/5_ee/05_stream/05_0002_ee_stream_create.result new file mode 100644 index 0000000000000..ba044695046f1 --- /dev/null +++ b/tests/suites/5_ee/05_stream/05_0002_ee_stream_create.result @@ -0,0 +1,7 @@ +Error: APIError: ResponseError with 2733: Change tracking has been missing for the time range requested on table 'db_stream.base' +3 INSERT false true +Error: APIError: ResponseError with 2733: Change tracking has been missing for the time range requested on table 'db_stream.base' +3 INSERT false true +Error: APIError: ResponseError with 2733: Change tracking has been missing for the time range requested on table 'db_stream.base' +Error: APIError: ResponseError with 2733: Change tracking has been missing for the time range requested on table 'db_stream.base' +Error: APIError: ResponseError with 2733: Change tracking has been missing for the time range requested on table 'db_stream.base' diff --git a/tests/suites/5_ee/05_stream/05_0002_ee_stream_create.sh b/tests/suites/5_ee/05_stream/05_0002_ee_stream_create.sh new file mode 100755 index 0000000000000..38101aa2c67f4 --- /dev/null +++ b/tests/suites/5_ee/05_stream/05_0002_ee_stream_create.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT +echo "create database db_stream" | $BENDSQL_CLIENT_CONNECT +echo "create table db_stream.base(a int)" | $BENDSQL_CLIENT_CONNECT +echo "insert into db_stream.base values(1)" | $BENDSQL_CLIENT_CONNECT +echo "alter table db_stream.base set options(change_tracking = true)" | $BENDSQL_CLIENT_CONNECT +echo "insert into db_stream.base values(2)" | $BENDSQL_CLIENT_CONNECT +echo "insert into db_stream.base values(3)" | $BENDSQL_CLIENT_CONNECT + +BASE_ROW_ID=$(echo "select _base_row_id from db_stream.base where a = 3" | $BENDSQL_CLIENT_CONNECT) + +SNAPSHOT_ID_1=$(echo "select snapshot_id from fuse_snapshot('db_stream','base') where row_count=1 limit 1" | $BENDSQL_CLIENT_CONNECT) +echo "create stream db_stream.s1 on table db_stream.base at (snapshot => '$SNAPSHOT_ID_1')" | $BENDSQL_CLIENT_CONNECT + +SNAPSHOT_ID_2=$(echo "select snapshot_id from fuse_snapshot('db_stream','base') where row_count=2 limit 1" | $BENDSQL_CLIENT_CONNECT) +echo "create stream db_stream.s2 on table db_stream.base at (snapshot => '$SNAPSHOT_ID_2')" | $BENDSQL_CLIENT_CONNECT +echo "select a, change\$action, change\$is_update, change\$row_id='$BASE_ROW_ID' from db_stream.s2" | $BENDSQL_CLIENT_CONNECT + +TIMEPOINT_1=$(echo "select timestamp from fuse_snapshot('db_stream','base') where row_count=1 limit 1" | $BENDSQL_CLIENT_CONNECT) +echo "create stream db_stream.t1 on table db_stream.base at (timestamp => '$TIMEPOINT_1'::TIMESTAMP)" | $BENDSQL_CLIENT_CONNECT + +echo "alter table db_stream.base set options(change_tracking = true)" | $BENDSQL_CLIENT_CONNECT +TIMEPOINT_2=$(echo "select timestamp from fuse_snapshot('db_stream','base') where row_count=2 limit 1" | $BENDSQL_CLIENT_CONNECT) +echo "create stream db_stream.t2 on table db_stream.base at (timestamp => '$TIMEPOINT_2'::TIMESTAMP)" | $BENDSQL_CLIENT_CONNECT +echo "select a, change\$action, change\$is_update, change\$row_id='$BASE_ROW_ID' from db_stream.t2" | $BENDSQL_CLIENT_CONNECT + +echo "alter table db_stream.base set options(change_tracking = false)" | $BENDSQL_CLIENT_CONNECT +echo "create stream db_stream.s3 on table db_stream.base at (snapshot => '$SNAPSHOT_ID_2')" | $BENDSQL_CLIENT_CONNECT +echo "alter table db_stream.base set options(change_tracking = true)" | $BENDSQL_CLIENT_CONNECT +echo "create stream db_stream.s4 on table db_stream.base at (stream => db_stream.s2)" | $BENDSQL_CLIENT_CONNECT +echo "select a from db_stream.s2" | $BENDSQL_CLIENT_CONNECT + +echo "drop stream if exists db_stream.s2" | $BENDSQL_CLIENT_CONNECT +echo "drop stream if exists db_stream.t2" | $BENDSQL_CLIENT_CONNECT +echo "drop table if exists db_stream.base all" | $BENDSQL_CLIENT_CONNECT +echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT diff --git a/tests/suites/7_management/00_stream_status/00_0000_stream_status.result b/tests/suites/7_management/00_stream_status/00_0000_stream_status.result index da29283aaa47c..4b095fd0ff9ca 100644 --- a/tests/suites/7_management/00_stream_status/00_0000_stream_status.result +++ b/tests/suites/7_management/00_stream_status/00_0000_stream_status.result @@ -1,2 +1,2 @@ -true +false false diff --git a/tests/suites/7_management/00_stream_status/00_0000_stream_status.sh b/tests/suites/7_management/00_stream_status/00_0000_stream_status.sh index 9bc5754b306ba..68cd6ead3c784 100755 --- a/tests/suites/7_management/00_stream_status/00_0000_stream_status.sh +++ b/tests/suites/7_management/00_stream_status/00_0000_stream_status.sh @@ -8,13 +8,12 @@ echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT echo "CREATE DATABASE db_stream" | $BENDSQL_CLIENT_CONNECT echo "create table db_stream.t(a int)" | $BENDSQL_CLIENT_CONNECT echo "create stream default.s1 on table db_stream.t comment = 'test'" | $BENDSQL_CLIENT_CONNECT -echo "create stream db_stream.s2 on table db_stream.t comment = 'test'" | $BENDSQL_CLIENT_CONNECT +echo "create stream db_stream.s2 on table db_stream.t at(stream => default.s1)" | $BENDSQL_CLIENT_CONNECT -# When s1 is created, t automatically turns on change tracking. So the offset of s1 and the version of t do not match. curl -X GET -s http://localhost:8081/v1/tenants/system/stream_status\?stream_name=s1 | jq .has_data curl -X GET -s http://localhost:8081/v1/tenants/system/stream_status\?stream_name\=s2\&database\=db_stream | jq .has_data echo "drop stream if exists default.s1" | $BENDSQL_CLIENT_CONNECT echo "drop stream if exists db_stream.s2" | $BENDSQL_CLIENT_CONNECT -echo "drop table if exists db_stream.t" | $BENDSQL_CLIENT_CONNECT +echo "drop table if exists db_stream.t all" | $BENDSQL_CLIENT_CONNECT echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT