Skip to content

Commit

Permalink
feat: add call procedure for sync stage
Browse files Browse the repository at this point in the history
Signed-off-by: Ye Sijun <junnplus@gmail.com>
  • Loading branch information
junnplus committed Jun 29, 2022
1 parent f7bf6f5 commit 6ac837f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
2 changes: 2 additions & 0 deletions query/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub use interpreter::InterpreterPtr;
pub use interpreter_call::CallInterpreter;
pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter;
pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
pub use interpreter_common::list_files_from_dal;
pub use interpreter_common::list_files_from_meta_api;
pub use interpreter_copy::CopyInterpreter;
pub use interpreter_database_create::CreateDatabaseInterpreter;
pub use interpreter_database_drop::DropDatabaseInterpreter;
Expand Down
2 changes: 2 additions & 0 deletions query/src/procedures/systems/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ mod clustering_information;
mod fuse_segment;
mod fuse_snapshot;
mod search_tables;
mod sync_stage;
mod system;

pub use clustering_information::ClusteringInformationProcedure;
pub use fuse_segment::FuseSegmentProcedure;
pub use fuse_snapshot::FuseSnapshotProcedure;
pub use search_tables::SearchTablesProcedure;
pub use sync_stage::SyncStageFileProcedure;
pub use system::SystemProcedure;
77 changes: 77 additions & 0 deletions query/src/procedures/systems/sync_stage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::StageType;

use crate::interpreters::list_files_from_dal;
use crate::procedures::Procedure;
use crate::procedures::ProcedureFeatures;
use crate::sessions::QueryContext;

pub struct SyncStageFileProcedure;

impl SyncStageFileProcedure {
pub fn try_create() -> Result<Box<dyn Procedure>> {
Ok(Box::new(SyncStageFileProcedure {}))
}
}

#[async_trait::async_trait]
impl Procedure for SyncStageFileProcedure {
fn name(&self) -> &str {
"SYNC_STAGE_FILE"
}

fn features(&self) -> ProcedureFeatures {
ProcedureFeatures::default().num_arguments(2)
}

async fn inner_eval(&self, ctx: Arc<QueryContext>, args: Vec<String>) -> Result<DataBlock> {
let stage_name = args[0].clone();
let path = args[1].clone();
let path = path.trim_start_matches('/');

let tenant = ctx.get_tenant();
let user_mgr = ctx.get_user_manager();

let stage = user_mgr.get_stage(&tenant, &stage_name).await?;
if stage.stage_type != StageType::Internal {
return Err(ErrorCode::BadArguments("only support internal stage"));
}

let prefix = stage.get_prefix();
let path = format!("{prefix}{path}");
let files = list_files_from_dal(&ctx, &stage, &path, "").await?;
for file in files.iter() {
let mut file = file.clone();
file.path = file
.path
.trim_start_matches('/')
.trim_start_matches(prefix.trim_start_matches('/'))
.to_string();
let _ = user_mgr.add_file(&tenant, &stage.stage_name, file).await;
}
Ok(DataBlock::empty())
}

fn schema(&self) -> Arc<DataSchema> {
Arc::new(DataSchema::empty())
}
}
5 changes: 5 additions & 0 deletions query/src/procedures/systems/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::SyncStageFileProcedure;
use crate::procedures::systems::ClusteringInformationProcedure;
use crate::procedures::systems::FuseSegmentProcedure;
use crate::procedures::systems::FuseSnapshotProcedure;
Expand All @@ -38,5 +39,9 @@ impl SystemProcedure {
"system$search_tables",
Box::new(SearchTablesProcedure::try_create),
);
factory.register(
"system$sync_stage_file",
Box::new(SyncStageFileProcedure::try_create),
)
}
}

0 comments on commit 6ac837f

Please sign in to comment.