diff --git a/query/src/interpreters/mod.rs b/query/src/interpreters/mod.rs index d0e15111ecf13..a495aeb96197e 100644 --- a/query/src/interpreters/mod.rs +++ b/query/src/interpreters/mod.rs @@ -97,6 +97,8 @@ pub use interpreter::InterpreterPtr; pub use interpreter_call::CallInterpreter; pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter; pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter; +pub use interpreter_common::list_files_from_dal; +pub use interpreter_common::list_files_from_meta_api; pub use interpreter_copy::CopyInterpreter; pub use interpreter_database_create::CreateDatabaseInterpreter; pub use interpreter_database_drop::DropDatabaseInterpreter; diff --git a/query/src/procedures/systems/mod.rs b/query/src/procedures/systems/mod.rs index 631819212304c..222a839fb1652 100644 --- a/query/src/procedures/systems/mod.rs +++ b/query/src/procedures/systems/mod.rs @@ -16,10 +16,12 @@ mod clustering_information; mod fuse_segment; mod fuse_snapshot; mod search_tables; +mod sync_stage; mod system; pub use clustering_information::ClusteringInformationProcedure; pub use fuse_segment::FuseSegmentProcedure; pub use fuse_snapshot::FuseSnapshotProcedure; pub use search_tables::SearchTablesProcedure; +pub use sync_stage::SyncStageFileProcedure; pub use system::SystemProcedure; diff --git a/query/src/procedures/systems/sync_stage.rs b/query/src/procedures/systems/sync_stage.rs new file mode 100644 index 0000000000000..5f7b10a51dfcd --- /dev/null +++ b/query/src/procedures/systems/sync_stage.rs @@ -0,0 +1,77 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_datavalues::DataSchema; +use common_exception::ErrorCode; +use common_exception::Result; +use common_meta_types::StageType; + +use crate::interpreters::list_files_from_dal; +use crate::procedures::Procedure; +use crate::procedures::ProcedureFeatures; +use crate::sessions::QueryContext; + +pub struct SyncStageFileProcedure; + +impl SyncStageFileProcedure { + pub fn try_create() -> Result> { + Ok(Box::new(SyncStageFileProcedure {})) + } +} + +#[async_trait::async_trait] +impl Procedure for SyncStageFileProcedure { + fn name(&self) -> &str { + "SYNC_STAGE_FILE" + } + + fn features(&self) -> ProcedureFeatures { + ProcedureFeatures::default().num_arguments(2) + } + + async fn inner_eval(&self, ctx: Arc, args: Vec) -> Result { + let stage_name = args[0].clone(); + let path = args[1].clone(); + let path = path.trim_start_matches('/'); + + let tenant = ctx.get_tenant(); + let user_mgr = ctx.get_user_manager(); + + let stage = user_mgr.get_stage(&tenant, &stage_name).await?; + if stage.stage_type != StageType::Internal { + return Err(ErrorCode::BadArguments("only support internal stage")); + } + + let prefix = stage.get_prefix(); + let path = format!("{prefix}{path}"); + let files = list_files_from_dal(&ctx, &stage, &path, "").await?; + for file in files.iter() { + let mut file = file.clone(); + file.path = file + .path + .trim_start_matches('/') + .trim_start_matches(prefix.trim_start_matches('/')) + .to_string(); + let _ = user_mgr.add_file(&tenant, &stage.stage_name, file).await; + } + Ok(DataBlock::empty()) + } + + fn schema(&self) -> Arc { + Arc::new(DataSchema::empty()) + } +} diff --git a/query/src/procedures/systems/system.rs b/query/src/procedures/systems/system.rs index ec3f30ceaefe9..4518effa7df1f 100644 --- a/query/src/procedures/systems/system.rs +++ b/query/src/procedures/systems/system.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use super::SyncStageFileProcedure; use crate::procedures::systems::ClusteringInformationProcedure; use crate::procedures::systems::FuseSegmentProcedure; use crate::procedures::systems::FuseSnapshotProcedure; @@ -38,5 +39,9 @@ impl SystemProcedure { "system$search_tables", Box::new(SearchTablesProcedure::try_create), ); + factory.register( + "system$sync_stage_file", + Box::new(SyncStageFileProcedure::try_create), + ) } }