Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Mar 25, 2024
1 parent b5f85b4 commit a5090b5
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/query/ast/src/ast/statements/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Display for StreamPoint {
write_dot_separated_list(f, database.iter().chain(Some(name)))?;
write!(f, ")")
}
StreamPoint::AtPoint(point) => write!(f, " AT{}", point),
StreamPoint::AtPoint(point) => write!(f, " AT {}", point),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ fn test_statement() {
r#"show full views from ctl.db"#,
r#"create stream test2.s1 on table test.t append_only = false;"#,
r#"create stream if not exists test2.s2 on table test.t at (stream => test1.s1) comment = 'this is a stream';"#,
r#"create stream if not exists test2.s3 on table test.t at (TIMESTAMP => '2023-06-26 09:49:02.038483'::TIMESTAMP) append_only = false;"#,
r#"create stream if not exists test2.s3 on table test.t at (SNAPSHOT => '9828b23f74664ff3806f44bbc1925ea5') append_only = true;"#,
r#"create or replace stream test2.s1 on table test.t append_only = false;"#,
r#"show full streams from default.test2 like 's%';"#,
r#"describe stream test2.s2;"#,
Expand Down
130 changes: 130 additions & 0 deletions src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3692,6 +3692,136 @@ CreateStream(
)


---------- Input ----------
create stream if not exists test2.s3 on table test.t at (TIMESTAMP => '2023-06-26 09:49:02.038483'::TIMESTAMP) append_only = false;
---------- Output ---------
CREATE STREAM IF NOT EXISTS test2.s3 ON TABLE test.t AT (TIMESTAMP => '2023-06-26 09:49:02.038483'::TIMESTAMP) APPEND_ONLY = false
---------- AST ------------
CreateStream(
CreateStreamStmt {
create_option: CreateIfNotExists,
catalog: None,
database: Some(
Identifier {
span: Some(
28..33,
),
name: "test2",
quote: None,
is_hole: false,
},
),
stream: Identifier {
span: Some(
34..36,
),
name: "s3",
quote: None,
is_hole: false,
},
table_database: Some(
Identifier {
span: Some(
46..50,
),
name: "test",
quote: None,
is_hole: false,
},
),
table: Identifier {
span: Some(
51..52,
),
name: "t",
quote: None,
is_hole: false,
},
stream_point: Some(
AtPoint(
Timestamp(
Cast {
span: Some(
98..109,
),
expr: Literal {
span: Some(
70..98,
),
lit: String(
"2023-06-26 09:49:02.038483",
),
},
target_type: Timestamp,
pg_style: true,
},
),
),
),
append_only: false,
comment: None,
},
)


---------- Input ----------
create stream if not exists test2.s3 on table test.t at (SNAPSHOT => '9828b23f74664ff3806f44bbc1925ea5') append_only = true;
---------- Output ---------
CREATE STREAM IF NOT EXISTS test2.s3 ON TABLE test.t AT (SNAPSHOT => '9828b23f74664ff3806f44bbc1925ea5')
---------- AST ------------
CreateStream(
CreateStreamStmt {
create_option: CreateIfNotExists,
catalog: None,
database: Some(
Identifier {
span: Some(
28..33,
),
name: "test2",
quote: None,
is_hole: false,
},
),
stream: Identifier {
span: Some(
34..36,
),
name: "s3",
quote: None,
is_hole: false,
},
table_database: Some(
Identifier {
span: Some(
46..50,
),
name: "test",
quote: None,
is_hole: false,
},
),
table: Identifier {
span: Some(
51..52,
),
name: "t",
quote: None,
is_hole: false,
},
stream_point: Some(
AtPoint(
Snapshot(
"9828b23f74664ff3806f44bbc1925ea5",
),
),
),
append_only: true,
comment: None,
},
)


---------- Input ----------
create or replace stream test2.s1 on table test.t append_only = false;
---------- Output ---------
Expand Down
1 change: 1 addition & 0 deletions src/query/ee/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ mod background_service;
mod inverted_index;
mod license;
mod storages;
mod stream;
15 changes: 15 additions & 0 deletions src/query/ee/tests/it/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod stream_create;
76 changes: 76 additions & 0 deletions src/query/ee/tests/it/stream/stream_create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::Duration;
use chrono::Utc;
use databend_common_base::base::tokio;
use databend_common_exception::Result;
use databend_enterprise_query::test_kits::context::EESetup;
use databend_query::test_kits::generate_snapshots;
use databend_query::test_kits::TestFixture;

#[tokio::test(flavor = "multi_thread")]
async fn test_stream_create() -> Result<()> {
let fixture = TestFixture::setup_with_custom(EESetup::new()).await?;
fixture.create_default_database().await?;
fixture.create_normal_table().await?;

let db = fixture.default_db_name();
let tbl = fixture.default_table_name();
{
let qry = format!(
"alter table {}.{} set options(change_tracking=true)",
db, tbl
);
let r = fixture.execute_command(&qry).await;
assert!(r.is_ok());
}

generate_snapshots(&fixture).await?;
let now = Utc::now();
{
let time_point = now - Duration::hours(14);
let qry = format!(
"create stream s on table {}.{} at(timestamp => '{:?}'::TIMESTAMP)",
db, tbl, time_point
);
let r = fixture.execute_command(&qry).await;
assert!(r.is_err());
let expect = "TableHistoricalDataNotFound. Code: 2013, Text = No historical data found at given point.";
assert_eq!(expect, format!("{}", r.unwrap_err()));
}

{
let time_point = now - Duration::hours(12);
let qry = format!(
"create stream s on table {}.{} at(timestamp => '{:?}'::TIMESTAMP)",
db, tbl, time_point
);
let r = fixture.execute_command(&qry).await;
assert!(r.is_err());
let expect = "IllegalStream. Code: 2733, Text = The stream navigation at point has not table version.";
assert_eq!(expect, format!("{}", r.unwrap_err()));
}

{
let qry = format!(
"create stream s on table {}.{} append_only = false",
db, tbl
);
let r = fixture.execute_command(&qry).await;
assert!(r.is_ok());
}

Ok(())
}
7 changes: 7 additions & 0 deletions tests/suites/5_ee/05_stream/05_0002_ee_stream_create.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Error: APIError: ResponseError with 2733: Change tracking has been missing for the time range requested on table 'db_stream.base'
3 INSERT false true
Error: APIError: ResponseError with 2733: Change tracking has been missing for the time range requested on table 'db_stream.base'
3 INSERT false true
Error: APIError: ResponseError with 2733: Change tracking has been missing for the time range requested on table 'db_stream.base'
Error: APIError: ResponseError with 2733: Change tracking has been missing for the time range requested on table 'db_stream.base'
Error: APIError: ResponseError with 2733: Change tracking has been missing for the time range requested on table 'db_stream.base'
40 changes: 40 additions & 0 deletions tests/suites/5_ee/05_stream/05_0002_ee_stream_create.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../shell_env.sh

echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT
echo "create database db_stream" | $BENDSQL_CLIENT_CONNECT
echo "create table db_stream.base(a int)" | $BENDSQL_CLIENT_CONNECT
echo "insert into db_stream.base values(1)" | $BENDSQL_CLIENT_CONNECT
echo "alter table db_stream.base set options(change_tracking = true)" | $BENDSQL_CLIENT_CONNECT
echo "insert into db_stream.base values(2)" | $BENDSQL_CLIENT_CONNECT
echo "insert into db_stream.base values(3)" | $BENDSQL_CLIENT_CONNECT

BASE_ROW_ID=$(echo "select _base_row_id from db_stream.base where a = 3" | $BENDSQL_CLIENT_CONNECT)

SNAPSHOT_ID_1=$(echo "select snapshot_id from fuse_snapshot('db_stream','base') where row_count=1 limit 1" | $BENDSQL_CLIENT_CONNECT)
echo "create stream db_stream.s1 on table db_stream.base at (snapshot => '$SNAPSHOT_ID_1')" | $BENDSQL_CLIENT_CONNECT

SNAPSHOT_ID_2=$(echo "select snapshot_id from fuse_snapshot('db_stream','base') where row_count=2 limit 1" | $BENDSQL_CLIENT_CONNECT)
echo "create stream db_stream.s2 on table db_stream.base at (snapshot => '$SNAPSHOT_ID_2')" | $BENDSQL_CLIENT_CONNECT
echo "select a, change\$action, change\$is_update, change\$row_id='$BASE_ROW_ID' from db_stream.s2" | $BENDSQL_CLIENT_CONNECT

TIMEPOINT_1=$(echo "select timestamp from fuse_snapshot('db_stream','base') where row_count=1 limit 1" | $BENDSQL_CLIENT_CONNECT)
echo "create stream db_stream.t1 on table db_stream.base at (timestamp => '$TIMEPOINT_1'::TIMESTAMP)" | $BENDSQL_CLIENT_CONNECT

echo "alter table db_stream.base set options(change_tracking = true)" | $BENDSQL_CLIENT_CONNECT
TIMEPOINT_2=$(echo "select timestamp from fuse_snapshot('db_stream','base') where row_count=2 limit 1" | $BENDSQL_CLIENT_CONNECT)
echo "create stream db_stream.t2 on table db_stream.base at (timestamp => '$TIMEPOINT_2'::TIMESTAMP)" | $BENDSQL_CLIENT_CONNECT
echo "select a, change\$action, change\$is_update, change\$row_id='$BASE_ROW_ID' from db_stream.t2" | $BENDSQL_CLIENT_CONNECT

echo "alter table db_stream.base set options(change_tracking = false)" | $BENDSQL_CLIENT_CONNECT
echo "create stream db_stream.s3 on table db_stream.base at (snapshot => '$SNAPSHOT_ID_2')" | $BENDSQL_CLIENT_CONNECT
echo "alter table db_stream.base set options(change_tracking = true)" | $BENDSQL_CLIENT_CONNECT
echo "create stream db_stream.s4 on table db_stream.base at (stream => db_stream.s2)" | $BENDSQL_CLIENT_CONNECT
echo "select a from db_stream.s2" | $BENDSQL_CLIENT_CONNECT

echo "drop stream if exists db_stream.s2" | $BENDSQL_CLIENT_CONNECT
echo "drop stream if exists db_stream.t2" | $BENDSQL_CLIENT_CONNECT
echo "drop table if exists db_stream.base all" | $BENDSQL_CLIENT_CONNECT
echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
true
false
false
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT
echo "CREATE DATABASE db_stream" | $BENDSQL_CLIENT_CONNECT
echo "create table db_stream.t(a int)" | $BENDSQL_CLIENT_CONNECT
echo "create stream default.s1 on table db_stream.t comment = 'test'" | $BENDSQL_CLIENT_CONNECT
echo "create stream db_stream.s2 on table db_stream.t comment = 'test'" | $BENDSQL_CLIENT_CONNECT
echo "create stream db_stream.s2 on table db_stream.t at(stream => default.s1)" | $BENDSQL_CLIENT_CONNECT

# When s1 is created, t automatically turns on change tracking. So the offset of s1 and the version of t do not match.
curl -X GET -s http://localhost:8081/v1/tenants/system/stream_status\?stream_name=s1 | jq .has_data
curl -X GET -s http://localhost:8081/v1/tenants/system/stream_status\?stream_name\=s2\&database\=db_stream | jq .has_data

echo "drop stream if exists default.s1" | $BENDSQL_CLIENT_CONNECT
echo "drop stream if exists db_stream.s2" | $BENDSQL_CLIENT_CONNECT
echo "drop table if exists db_stream.t" | $BENDSQL_CLIENT_CONNECT
echo "drop table if exists db_stream.t all" | $BENDSQL_CLIENT_CONNECT
echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT

0 comments on commit a5090b5

Please sign in to comment.