diff --git a/query/src/interpreters/mod.rs b/query/src/interpreters/mod.rs index d0e15111ecf1..a495aeb96197 100644 --- a/query/src/interpreters/mod.rs +++ b/query/src/interpreters/mod.rs @@ -97,6 +97,8 @@ pub use interpreter::InterpreterPtr; pub use interpreter_call::CallInterpreter; pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter; pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter; +pub use interpreter_common::list_files_from_dal; +pub use interpreter_common::list_files_from_meta_api; pub use interpreter_copy::CopyInterpreter; pub use interpreter_database_create::CreateDatabaseInterpreter; pub use interpreter_database_drop::DropDatabaseInterpreter; diff --git a/query/src/procedures/systems/mod.rs b/query/src/procedures/systems/mod.rs index 631819212304..222a839fb165 100644 --- a/query/src/procedures/systems/mod.rs +++ b/query/src/procedures/systems/mod.rs @@ -16,10 +16,12 @@ mod clustering_information; mod fuse_segment; mod fuse_snapshot; mod search_tables; +mod sync_stage; mod system; pub use clustering_information::ClusteringInformationProcedure; pub use fuse_segment::FuseSegmentProcedure; pub use fuse_snapshot::FuseSnapshotProcedure; pub use search_tables::SearchTablesProcedure; +pub use sync_stage::SyncStageFileProcedure; pub use system::SystemProcedure; diff --git a/query/src/procedures/systems/sync_stage.rs b/query/src/procedures/systems/sync_stage.rs new file mode 100644 index 000000000000..5f7b10a51dfc --- /dev/null +++ b/query/src/procedures/systems/sync_stage.rs @@ -0,0 +1,77 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_datavalues::DataSchema; +use common_exception::ErrorCode; +use common_exception::Result; +use common_meta_types::StageType; + +use crate::interpreters::list_files_from_dal; +use crate::procedures::Procedure; +use crate::procedures::ProcedureFeatures; +use crate::sessions::QueryContext; + +pub struct SyncStageFileProcedure; + +impl SyncStageFileProcedure { + pub fn try_create() -> Result> { + Ok(Box::new(SyncStageFileProcedure {})) + } +} + +#[async_trait::async_trait] +impl Procedure for SyncStageFileProcedure { + fn name(&self) -> &str { + "SYNC_STAGE_FILE" + } + + fn features(&self) -> ProcedureFeatures { + ProcedureFeatures::default().num_arguments(2) + } + + async fn inner_eval(&self, ctx: Arc, args: Vec) -> Result { + let stage_name = args[0].clone(); + let path = args[1].clone(); + let path = path.trim_start_matches('/'); + + let tenant = ctx.get_tenant(); + let user_mgr = ctx.get_user_manager(); + + let stage = user_mgr.get_stage(&tenant, &stage_name).await?; + if stage.stage_type != StageType::Internal { + return Err(ErrorCode::BadArguments("only support internal stage")); + } + + let prefix = stage.get_prefix(); + let path = format!("{prefix}{path}"); + let files = list_files_from_dal(&ctx, &stage, &path, "").await?; + for file in files.iter() { + let mut file = file.clone(); + file.path = file + .path + .trim_start_matches('/') + .trim_start_matches(prefix.trim_start_matches('/')) + .to_string(); + let _ = user_mgr.add_file(&tenant, &stage.stage_name, file).await; + } + Ok(DataBlock::empty()) + } + + fn schema(&self) -> Arc { + Arc::new(DataSchema::empty()) + } +} diff --git a/query/src/procedures/systems/system.rs b/query/src/procedures/systems/system.rs index ec3f30ceaefe..4518effa7df1 100644 --- a/query/src/procedures/systems/system.rs +++ b/query/src/procedures/systems/system.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use super::SyncStageFileProcedure; use crate::procedures::systems::ClusteringInformationProcedure; use crate::procedures::systems::FuseSegmentProcedure; use crate::procedures::systems::FuseSnapshotProcedure; @@ -38,5 +39,9 @@ impl SystemProcedure { "system$search_tables", Box::new(SearchTablesProcedure::try_create), ); + factory.register( + "system$sync_stage_file", + Box::new(SyncStageFileProcedure::try_create), + ) } } diff --git a/tests/suites/1_stateful/01_load/01_0003_sync_stage_file.result b/tests/suites/1_stateful/01_load/01_0003_sync_stage_file.result new file mode 100755 index 000000000000..d2647b19234a --- /dev/null +++ b/tests/suites/1_stateful/01_load/01_0003_sync_stage_file.result @@ -0,0 +1,10 @@ +=== List files in internal stage === +ontime_200.csv +=== List stage files after sync === +ontime_200.csv +ontime_200.csv.gz +dir/ontime_200.csv +dir/ontime_200.csv.gz +dir/ontime_200.csv.zst +ontime_200.csv +ontime_200.csv.gz diff --git a/tests/suites/1_stateful/01_load/01_0003_sync_stage_file.sh b/tests/suites/1_stateful/01_load/01_0003_sync_stage_file.sh new file mode 100755 index 000000000000..aa8dd77fa99d --- /dev/null +++ b/tests/suites/1_stateful/01_load/01_0003_sync_stage_file.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +echo "drop stage if exists test_sync" | $MYSQL_CLIENT_CONNECT + +echo "CREATE STAGE test_sync;" | $MYSQL_CLIENT_CONNECT + +aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv s3://testbucket/admin/stage/test_sync/ontime_200.csv >/dev/null 2>&1 + +## List files in internal stage +echo "=== List files in internal stage ===" +echo "list @test_sync" | $MYSQL_CLIENT_CONNECT | awk '{print $1}' | sort + +aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.gz s3://testbucket/admin/stage/test_sync/ontime_200.csv.gz >/dev/null 2>&1 +aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.zst s3://testbucket/admin/stage/test_sync/ontime_200.csv.zst >/dev/null 2>&1 + +echo "call system\$sync_stage_file('test_sync', 'ontime_200.csv.gz')" | $MYSQL_CLIENT_CONNECT + +echo "=== List stage files after sync ===" +echo "list @test_sync" | $MYSQL_CLIENT_CONNECT | awk '{print $1}' | sort + +aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv s3://testbucket/admin/stage/test_sync/dir/ontime_200.csv >/dev/null 2>&1 +aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.gz s3://testbucket/admin/stage/test_sync/dir/ontime_200.csv.gz >/dev/null 2>&1 +aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.zst s3://testbucket/admin/stage/test_sync/dir/ontime_200.csv.zst >/dev/null 2>&1 + +echo "call system\$sync_stage_file('test_sync', 'dir')" | $MYSQL_CLIENT_CONNECT +echo "list @test_sync" | $MYSQL_CLIENT_CONNECT | awk '{print $1}' | sort + +echo "drop stage test_sync" | $MYSQL_CLIENT_CONNECT