Skip to content

Commit

Permalink
Merge pull request #6344 from junnplus/sync-stage
Browse files Browse the repository at this point in the history
feat: add call procedure for sync stage
  • Loading branch information
BohuTANG authored Jun 30, 2022
2 parents 1114140 + 5b6482b commit 34f4797
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 0 deletions.
2 changes: 2 additions & 0 deletions query/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub use interpreter::InterpreterPtr;
pub use interpreter_call::CallInterpreter;
pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter;
pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
pub use interpreter_common::list_files_from_dal;
pub use interpreter_common::list_files_from_meta_api;
pub use interpreter_copy::CopyInterpreter;
pub use interpreter_database_create::CreateDatabaseInterpreter;
pub use interpreter_database_drop::DropDatabaseInterpreter;
Expand Down
2 changes: 2 additions & 0 deletions query/src/procedures/systems/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ mod clustering_information;
mod fuse_segment;
mod fuse_snapshot;
mod search_tables;
mod sync_stage;
mod system;

pub use clustering_information::ClusteringInformationProcedure;
pub use fuse_segment::FuseSegmentProcedure;
pub use fuse_snapshot::FuseSnapshotProcedure;
pub use search_tables::SearchTablesProcedure;
pub use sync_stage::SyncStageFileProcedure;
pub use system::SystemProcedure;
77 changes: 77 additions & 0 deletions query/src/procedures/systems/sync_stage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::StageType;

use crate::interpreters::list_files_from_dal;
use crate::procedures::Procedure;
use crate::procedures::ProcedureFeatures;
use crate::sessions::QueryContext;

pub struct SyncStageFileProcedure;

impl SyncStageFileProcedure {
pub fn try_create() -> Result<Box<dyn Procedure>> {
Ok(Box::new(SyncStageFileProcedure {}))
}
}

#[async_trait::async_trait]
impl Procedure for SyncStageFileProcedure {
fn name(&self) -> &str {
"SYNC_STAGE_FILE"
}

fn features(&self) -> ProcedureFeatures {
ProcedureFeatures::default().num_arguments(2)
}

async fn inner_eval(&self, ctx: Arc<QueryContext>, args: Vec<String>) -> Result<DataBlock> {
let stage_name = args[0].clone();
let path = args[1].clone();
let path = path.trim_start_matches('/');

let tenant = ctx.get_tenant();
let user_mgr = ctx.get_user_manager();

let stage = user_mgr.get_stage(&tenant, &stage_name).await?;
if stage.stage_type != StageType::Internal {
return Err(ErrorCode::BadArguments("only support internal stage"));
}

let prefix = stage.get_prefix();
let path = format!("{prefix}{path}");
let files = list_files_from_dal(&ctx, &stage, &path, "").await?;
for file in files.iter() {
let mut file = file.clone();
file.path = file
.path
.trim_start_matches('/')
.trim_start_matches(prefix.trim_start_matches('/'))
.to_string();
let _ = user_mgr.add_file(&tenant, &stage.stage_name, file).await;
}
Ok(DataBlock::empty())
}

fn schema(&self) -> Arc<DataSchema> {
Arc::new(DataSchema::empty())
}
}
5 changes: 5 additions & 0 deletions query/src/procedures/systems/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::SyncStageFileProcedure;
use crate::procedures::systems::ClusteringInformationProcedure;
use crate::procedures::systems::FuseSegmentProcedure;
use crate::procedures::systems::FuseSnapshotProcedure;
Expand All @@ -38,5 +39,9 @@ impl SystemProcedure {
"system$search_tables",
Box::new(SearchTablesProcedure::try_create),
);
factory.register(
"system$sync_stage_file",
Box::new(SyncStageFileProcedure::try_create),
)
}
}
10 changes: 10 additions & 0 deletions tests/suites/1_stateful/01_load/01_0003_sync_stage_file.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
=== List files in internal stage ===
ontime_200.csv
=== List stage files after sync ===
ontime_200.csv
ontime_200.csv.gz
dir/ontime_200.csv
dir/ontime_200.csv.gz
dir/ontime_200.csv.zst
ontime_200.csv
ontime_200.csv.gz
31 changes: 31 additions & 0 deletions tests/suites/1_stateful/01_load/01_0003_sync_stage_file.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../shell_env.sh

echo "drop stage if exists test_sync" | $MYSQL_CLIENT_CONNECT

echo "CREATE STAGE test_sync;" | $MYSQL_CLIENT_CONNECT

aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv s3://testbucket/admin/stage/test_sync/ontime_200.csv >/dev/null 2>&1

## List files in internal stage
echo "=== List files in internal stage ==="
echo "list @test_sync" | $MYSQL_CLIENT_CONNECT | awk '{print $1}' | sort

aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.gz s3://testbucket/admin/stage/test_sync/ontime_200.csv.gz >/dev/null 2>&1
aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.zst s3://testbucket/admin/stage/test_sync/ontime_200.csv.zst >/dev/null 2>&1

echo "call system\$sync_stage_file('test_sync', 'ontime_200.csv.gz')" | $MYSQL_CLIENT_CONNECT

echo "=== List stage files after sync ==="
echo "list @test_sync" | $MYSQL_CLIENT_CONNECT | awk '{print $1}' | sort

aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv s3://testbucket/admin/stage/test_sync/dir/ontime_200.csv >/dev/null 2>&1
aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.gz s3://testbucket/admin/stage/test_sync/dir/ontime_200.csv.gz >/dev/null 2>&1
aws --endpoint-url http://127.0.0.1:9900/ s3 cp s3://testbucket/admin/data/ontime_200.csv.zst s3://testbucket/admin/stage/test_sync/dir/ontime_200.csv.zst >/dev/null 2>&1

echo "call system\$sync_stage_file('test_sync', 'dir')" | $MYSQL_CLIENT_CONNECT
echo "list @test_sync" | $MYSQL_CLIENT_CONNECT | awk '{print $1}' | sort

echo "drop stage test_sync" | $MYSQL_CLIENT_CONNECT

0 comments on commit 34f4797

Please sign in to comment.