Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Mar 21, 2024
1 parent b6adf2f commit ee462e1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
25 changes: 17 additions & 8 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_meta_types::MatchSeq;
use databend_common_sql::plans::CreateStreamPlan;
use databend_common_sql::plans::DropStreamPlan;
use databend_common_sql::plans::StreamNavigation;
use databend_common_storages_fuse::io::SnapshotsIO;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_common_storages_stream::stream_table::StreamTable;
Expand Down Expand Up @@ -128,17 +129,25 @@ impl StreamHandler for RealStreamHandler {
}
}
Some(StreamNavigation::AtPoint(point)) => {
if plan.append_only {
return Err(ErrorCode::IllegalStream(
"The stream navigation at point is not supported in append only mode"
.to_string(),
));
}

let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let source = fuse_table.navigate_to(point).await?;
if let Some(snapshot_loc) = source.snapshot_loc().await? {
options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc);
options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc.clone());
let (snapshot, _) =
SnapshotsIO::read_snapshot(snapshot_loc, fuse_table.get_operator()).await?;
if let Some(table_version) = snapshot.table_version {
// The table version is the version of the table when the snapshot was created.
// We need make sure the version greater than the table version,
// and less equal than the table version after the snapshot commit.
let version = table_version + 1;
options.insert(OPT_KEY_TABLE_VER.to_string(), version.to_string());
} else if plan.append_only {
return Err(ErrorCode::IllegalStream(
"The stream navigation at point has not table version".to_string(),
));
}
} else {
unreachable!()
}
}
None => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
| 'table_schema' | 'information_schema' | 'tables' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'table_schema' | 'information_schema' | 'views' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'table_type' | 'information_schema' | 'tables' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'table_version' | 'system' | 'streams' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' |
| 'table_version' | 'system' | 'streams' | 'Nullable(UInt64)' | 'BIGINT UNSIGNED' | '' | '' | 'YES' | '' |
| 'tables' | 'system' | 'query_log' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'target_features' | 'system' | 'build_options' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
| 'task_running_secs' | 'system' | 'background_tasks' | 'Nullable(UInt64)' | 'BIGINT UNSIGNED' | '' | '' | 'YES' | '' |
Expand Down

0 comments on commit ee462e1

Please sign in to comment.