Skip to content

Commit

Permalink
feat: create stream at navigation point (#15062)
Browse files Browse the repository at this point in the history
* table version opt

* create stream at point

* add table_version in snapshot

* add begin version check

* add test

* fix review comment

* fix test
  • Loading branch information
zhyass authored Mar 26, 2024
1 parent fcf5ba8 commit 40d2b84
Show file tree
Hide file tree
Showing 41 changed files with 478 additions and 79 deletions.
1 change: 1 addition & 0 deletions src/query/ast/src/ast/format/ast_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3292,6 +3292,7 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor {
fn visit_stream_point(&mut self, point: &'ast StreamPoint) {
match point {
StreamPoint::AtStream { database, name } => self.visit_table_ref(&None, database, name),
StreamPoint::AtPoint(point) => self.visit_time_travel_point(point),
}
}

Expand Down
39 changes: 21 additions & 18 deletions src/query/ast/src/ast/format/syntax/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,24 +335,27 @@ pub(crate) fn pretty_create_stream(stmt: CreateStreamStmt) -> RcDoc<'static> {
.append(RcDoc::text(stmt.table.to_string())),
),
)
.append(
if let Some(StreamPoint::AtStream { database, name }) = stmt.stream_point {
RcDoc::space()
.append(RcDoc::text("AT (STREAM => "))
.append(
RcDoc::space()
.append(if let Some(database) = database {
RcDoc::text(database.to_string()).append(RcDoc::text("."))
} else {
RcDoc::nil()
})
.append(RcDoc::text(name.to_string())),
)
.append(RcDoc::text(")"))
} else {
RcDoc::nil()
},
)
.append(match stmt.stream_point {
Some(StreamPoint::AtPoint(TimeTravelPoint::Snapshot(sid))) => {
RcDoc::text(format!(" AT (SNAPSHOT => {sid})"))
}
Some(StreamPoint::AtPoint(TimeTravelPoint::Timestamp(ts))) => {
RcDoc::text(format!(" AT (TIMESTAMP => {ts})"))
}
Some(StreamPoint::AtStream { database, name }) => RcDoc::space()
.append(RcDoc::text("AT (STREAM => "))
.append(
RcDoc::space()
.append(if let Some(database) = database {
RcDoc::text(database.to_string()).append(RcDoc::text("."))
} else {
RcDoc::nil()
})
.append(RcDoc::text(name.to_string())),
)
.append(RcDoc::text(")")),
None => RcDoc::nil(),
})
.append(if !stmt.append_only {
RcDoc::space().append(RcDoc::text("APPEND_ONLY = false"))
} else {
Expand Down
3 changes: 3 additions & 0 deletions src/query/ast/src/ast/statements/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use derive_visitor::DriveMut;
use crate::ast::write_dot_separated_list;
use crate::ast::Identifier;
use crate::ast::ShowLimit;
use crate::ast::TimeTravelPoint;

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub enum StreamPoint {
AtStream {
database: Option<Identifier>,
name: Identifier,
},
AtPoint(TimeTravelPoint),
}

impl Display for StreamPoint {
Expand All @@ -39,6 +41,7 @@ impl Display for StreamPoint {
write_dot_separated_list(f, database.iter().chain(Some(name)))?;
write!(f, ")")
}
StreamPoint::AtPoint(point) => write!(f, " AT {}", point),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/ast/src/ast/visitors/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ pub fn walk_stream_point<'a, V: Visitor<'a>>(visitor: &mut V, point: &'a StreamP

visitor.visit_identifier(name);
}
StreamPoint::AtPoint(point) => {
visitor.visit_time_travel_point(point);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/ast/visitors/walk_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ pub fn walk_stream_point_mut<V: VisitorMut>(visitor: &mut V, stream: &mut Stream

visitor.visit_identifier(name);
}
StreamPoint::AtPoint(point) => visitor.visit_time_travel_point(point),
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/query/ast/src/parser/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use nom::combinator::map;

use super::statement::parse_create_option;
use crate::ast::CreateStreamStmt;
use crate::ast::DescribeStreamStmt;
use crate::ast::DropStreamStmt;
Expand All @@ -27,6 +26,8 @@ use crate::parser::common::map_res;
use crate::parser::common::IResult;
use crate::parser::expr::literal_bool;
use crate::parser::expr::literal_string;
use crate::parser::query::travel_point;
use crate::parser::statement::parse_create_option;
use crate::parser::statement::show_limit;
use crate::parser::token::TokenKind::*;
use crate::parser::Input;
Expand Down Expand Up @@ -100,12 +101,16 @@ fn drop_stream(i: Input) -> IResult<Statement> {
}

fn stream_point(i: Input) -> IResult<StreamPoint> {
let mut at_stream = map(
let at_stream = map(
rule! { AT ~ "(" ~ STREAM ~ "=>" ~ #dot_separated_idents_1_to_2 ~ ")" },
|(_, _, _, _, (database, name), _)| StreamPoint::AtStream { database, name },
);
let at_point = map(rule! { AT ~ ^#travel_point }, |(_, travel_point)| {
StreamPoint::AtPoint(travel_point)
});

rule!(
#at_stream
#at_stream | #at_point
)(i)
}

Expand Down
2 changes: 2 additions & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ fn test_statement() {
r#"show full views from ctl.db"#,
r#"create stream test2.s1 on table test.t append_only = false;"#,
r#"create stream if not exists test2.s2 on table test.t at (stream => test1.s1) comment = 'this is a stream';"#,
r#"create stream if not exists test2.s3 on table test.t at (TIMESTAMP => '2023-06-26 09:49:02.038483'::TIMESTAMP) append_only = false;"#,
r#"create stream if not exists test2.s3 on table test.t at (SNAPSHOT => '9828b23f74664ff3806f44bbc1925ea5') append_only = true;"#,
r#"create or replace stream test2.s1 on table test.t append_only = false;"#,
r#"show full streams from default.test2 like 's%';"#,
r#"describe stream test2.s2;"#,
Expand Down
130 changes: 130 additions & 0 deletions src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3692,6 +3692,136 @@ CreateStream(
)


---------- Input ----------
create stream if not exists test2.s3 on table test.t at (TIMESTAMP => '2023-06-26 09:49:02.038483'::TIMESTAMP) append_only = false;
---------- Output ---------
CREATE STREAM IF NOT EXISTS test2.s3 ON TABLE test.t AT (TIMESTAMP => '2023-06-26 09:49:02.038483'::TIMESTAMP) APPEND_ONLY = false
---------- AST ------------
CreateStream(
CreateStreamStmt {
create_option: CreateIfNotExists,
catalog: None,
database: Some(
Identifier {
span: Some(
28..33,
),
name: "test2",
quote: None,
is_hole: false,
},
),
stream: Identifier {
span: Some(
34..36,
),
name: "s3",
quote: None,
is_hole: false,
},
table_database: Some(
Identifier {
span: Some(
46..50,
),
name: "test",
quote: None,
is_hole: false,
},
),
table: Identifier {
span: Some(
51..52,
),
name: "t",
quote: None,
is_hole: false,
},
stream_point: Some(
AtPoint(
Timestamp(
Cast {
span: Some(
98..109,
),
expr: Literal {
span: Some(
70..98,
),
value: String(
"2023-06-26 09:49:02.038483",
),
},
target_type: Timestamp,
pg_style: true,
},
),
),
),
append_only: false,
comment: None,
},
)


---------- Input ----------
create stream if not exists test2.s3 on table test.t at (SNAPSHOT => '9828b23f74664ff3806f44bbc1925ea5') append_only = true;
---------- Output ---------
CREATE STREAM IF NOT EXISTS test2.s3 ON TABLE test.t AT (SNAPSHOT => '9828b23f74664ff3806f44bbc1925ea5')
---------- AST ------------
CreateStream(
CreateStreamStmt {
create_option: CreateIfNotExists,
catalog: None,
database: Some(
Identifier {
span: Some(
28..33,
),
name: "test2",
quote: None,
is_hole: false,
},
),
stream: Identifier {
span: Some(
34..36,
),
name: "s3",
quote: None,
is_hole: false,
},
table_database: Some(
Identifier {
span: Some(
46..50,
),
name: "test",
quote: None,
is_hole: false,
},
),
table: Identifier {
span: Some(
51..52,
),
name: "t",
quote: None,
is_hole: false,
},
stream_point: Some(
AtPoint(
Snapshot(
"9828b23f74664ff3806f44bbc1925ea5",
),
),
),
append_only: true,
comment: None,
},
)


---------- Input ----------
create or replace stream test2.s1 on table test.t append_only = false;
---------- Output ---------
Expand Down
5 changes: 4 additions & 1 deletion src/query/ee/src/inverted_index/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ impl Indexer {
write_data(index_bytes, operator, &new_index_info_loc).await?;

// Generate new table snapshot file
let mut new_snapshot = TableSnapshot::from_previous(snapshot.as_ref());
let mut new_snapshot = TableSnapshot::from_previous(
snapshot.as_ref(),
Some(fuse_table.get_table_info().ident.seq),
);
let mut index_info_locations = BTreeMap::new();
if let Some(old_index_info_locations) = &snapshot.index_info_locations {
for (old_index_name, location) in old_index_info_locations {
Expand Down
Loading

0 comments on commit 40d2b84

Please sign in to comment.