From bd2d27f2ac3cbeaf76a97c01b0100a764c110c9d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 31 Oct 2022 15:13:47 +0800 Subject: [PATCH 1/6] feat: Add User Stage Type Signed-off-by: Xuanwo --- .../proto-conv/src/user_from_to_protobuf_impl.rs | 2 ++ src/meta/protos/proto/user.proto | 1 + src/meta/types/src/user_stage.rs | 8 ++++++++ .../storages/preludes/src/system/stages_table.rs | 12 ++---------- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs index 9a26b5342cca..b598c1ef6a9b 100644 --- a/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs @@ -402,6 +402,7 @@ impl FromToProto for mt::StageType { pb::user_stage_info::StageType::LegacyInternal => Ok(mt::StageType::LegacyInternal), pb::user_stage_info::StageType::External => Ok(mt::StageType::External), pb::user_stage_info::StageType::Internal => Ok(mt::StageType::Internal), + pb::user_stage_info::StageType::User => Ok(mt::StageType::User), } } @@ -410,6 +411,7 @@ impl FromToProto for mt::StageType { mt::StageType::LegacyInternal => Ok(pb::user_stage_info::StageType::LegacyInternal), mt::StageType::External => Ok(pb::user_stage_info::StageType::External), mt::StageType::Internal => Ok(pb::user_stage_info::StageType::Internal), + mt::StageType::User => Ok(pb::user_stage_info::StageType::User), } } } diff --git a/src/meta/protos/proto/user.proto b/src/meta/protos/proto/user.proto index df1e36d45132..344518f9171f 100644 --- a/src/meta/protos/proto/user.proto +++ b/src/meta/protos/proto/user.proto @@ -127,6 +127,7 @@ message UserStageInfo { LegacyInternal = 0; External = 1; Internal = 2; + User = 3; } message StageStorage { diff --git a/src/meta/types/src/user_stage.rs b/src/meta/types/src/user_stage.rs index 708957120ce6..d7e2b2c64d75 100644 --- a/src/meta/types/src/user_stage.rs +++ b/src/meta/types/src/user_stage.rs @@ -59,6 +59,10 @@ pub enum StageType { LegacyInternal, External, Internal, + /// User Stage is the stage for every sql user. + /// + /// This is a stage that just in memory. We will not persist in metasrv + User, } impl fmt::Display for StageType { @@ -68,6 +72,7 @@ impl fmt::Display for StageType { StageType::LegacyInternal => "Internal", StageType::External => "External", StageType::Internal => "Internal", + StageType::User => "User", }; write!(f, "{}", name) } @@ -245,7 +250,9 @@ pub struct UserStageInfo { pub file_format_options: FileFormatOptions, pub copy_options: CopyOptions, pub comment: String, + /// TODO(xuanwo): stage doesn't have this info anymore, remove it. pub number_of_files: u64, + /// TODO(xuanwo): stage doesn't have this info anymore, remove it. pub creator: Option, } @@ -273,6 +280,7 @@ impl UserStageInfo { unreachable!("stage_prefix should never be called on external stage, must be a bug") } StageType::Internal => format!("/stage/internal/{}/", self.stage_name), + StageType::User => format!("/stage/user/{}/", self.stage_name), } } } diff --git a/src/query/storages/preludes/src/system/stages_table.rs b/src/query/storages/preludes/src/system/stages_table.rs index 15f493b4ca00..21d18ad37ad1 100644 --- a/src/query/storages/preludes/src/system/stages_table.rs +++ b/src/query/storages/preludes/src/system/stages_table.rs @@ -22,7 +22,6 @@ use common_exception::Result; use common_meta_app::schema::TableIdent; use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableMeta; -use common_meta_types::StageType; use common_users::UserApiProvider; use super::table::AsyncOneBlockSystemTable; @@ -59,15 +58,8 @@ impl AsyncSystemTable for StagesTable { stage_params.push(format!("{:?}", stage.stage_params).into_bytes()); copy_options.push(format!("{:?}", stage.copy_options).into_bytes()); file_format_options.push(format!("{:?}", stage.file_format_options).into_bytes()); - match stage.stage_type { - StageType::LegacyInternal | StageType::Internal => { - number_of_files.push(Some(stage.number_of_files)); - } - StageType::External => { - number_of_files.push(None); - } - }; - creator.push(stage.creator.map(|c| c.to_string().into_bytes())); + number_of_files.push(None); + creator.push(None); comment.push(stage.comment.clone().into_bytes()); } Ok(DataBlock::create(self.table_info.schema(), vec![ From c6e62aed2fee745abb9bf59f86f0ccd6202cece8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 31 Oct 2022 15:20:05 +0800 Subject: [PATCH 2/6] Fix test for meta Signed-off-by: Xuanwo --- src/meta/proto-conv/src/util.rs | 1 + .../proto-conv/tests/it/user_proto_conv.rs | 30 ++++++++++++ src/meta/proto-conv/tests/it/user_stage.rs | 46 +++++++++++++++++++ 3 files changed, 77 insertions(+) diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index d6ccd77f84de..2faa147f1a2b 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -65,6 +65,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (16, "2022-09-29: Add: CopyOptions::split_size"), (17, "2022-10-28: Add: StageType::LegacyInternal"), (18, "2022-10-28: Add: FILEFormatOptions::escape"), + (19, "2022-10-31: Add: StageType::UserStage"), ]; pub const VER: u64 = META_CHANGE_LOG.last().unwrap().0; diff --git a/src/meta/proto-conv/tests/it/user_proto_conv.rs b/src/meta/proto-conv/tests/it/user_proto_conv.rs index 472297c69edd..0b0bfefd923a 100644 --- a/src/meta/proto-conv/tests/it/user_proto_conv.rs +++ b/src/meta/proto-conv/tests/it/user_proto_conv.rs @@ -679,3 +679,33 @@ pub(crate) fn test_internal_stage_info_v17() -> mt::UserStageInfo { ..Default::default() } } + +pub(crate) fn test_user_stage_info_v18() -> mt::UserStageInfo { + mt::UserStageInfo { + stage_name: "root".to_string(), + stage_type: mt::StageType::User, + stage_params: mt::StageParams { + storage: StorageParams::Fs(StorageFsConfig { + root: "/dir/to/files".to_string(), + }), + }, + file_format_options: mt::FileFormatOptions { + format: mt::StageFileFormatType::Json, + skip_header: 1024, + field_delimiter: "|".to_string(), + record_delimiter: "//".to_string(), + escape: "".to_string(), + compression: mt::StageFileCompression::Bz2, + }, + copy_options: mt::CopyOptions { + on_error: mt::OnErrorMode::SkipFileNum(666), + size_limit: 1038, + split_size: 0, + purge: true, + single: false, + max_file_size: 0, + }, + comment: "test".to_string(), + ..Default::default() + } +} diff --git a/src/meta/proto-conv/tests/it/user_stage.rs b/src/meta/proto-conv/tests/it/user_stage.rs index 7dd46f6e4425..74e46a5f8b3f 100644 --- a/src/meta/proto-conv/tests/it/user_stage.rs +++ b/src/meta/proto-conv/tests/it/user_stage.rs @@ -27,6 +27,7 @@ use crate::user_proto_conv::test_gcs_stage_info; use crate::user_proto_conv::test_internal_stage_info_v17; use crate::user_proto_conv::test_oss_stage_info; use crate::user_proto_conv::test_s3_stage_info; +use crate::user_proto_conv::test_user_stage_info_v18; #[test] fn test_user_stage_fs_latest() -> anyhow::Result<()> { @@ -851,3 +852,48 @@ fn test_internal_stage_v17() -> anyhow::Result<()> { common::test_load_old(func_name!(), internal_stage_v17.as_slice(), want)?; Ok(()) } + +#[test] +fn test_user_stage_v18() -> anyhow::Result<()> { + common::test_pb_from_to("user_stage_v18", test_user_stage_info_v18())?; + + // Encoded data of version v17 of internal: + // It is generated with common::test_pb_from_to. + let user_stage_v18 = vec![ + 10, 4, 114, 111, 111, 116, 16, 3, 26, 25, 10, 23, 18, 21, 10, 13, 47, 100, 105, 114, 47, + 116, 111, 47, 102, 105, 108, 101, 115, 160, 6, 19, 168, 6, 1, 34, 20, 8, 1, 16, 128, 8, 26, + 1, 124, 34, 2, 47, 47, 40, 2, 160, 6, 19, 168, 6, 1, 42, 10, 10, 3, 32, 154, 5, 16, 142, 8, + 24, 1, 50, 4, 116, 101, 115, 116, 160, 6, 19, 168, 6, 1, + ]; + + let want = mt::UserStageInfo { + stage_name: "root".to_string(), + stage_type: mt::StageType::User, + stage_params: mt::StageParams { + storage: StorageParams::Fs(StorageFsConfig { + root: "/dir/to/files".to_string(), + }), + }, + file_format_options: mt::FileFormatOptions { + format: mt::StageFileFormatType::Json, + skip_header: 1024, + field_delimiter: "|".to_string(), + record_delimiter: "//".to_string(), + escape: "".to_string(), + compression: mt::StageFileCompression::Bz2, + }, + copy_options: mt::CopyOptions { + on_error: mt::OnErrorMode::SkipFileNum(666), + size_limit: 1038, + split_size: 0, + purge: true, + single: false, + max_file_size: 0, + }, + comment: "test".to_string(), + ..Default::default() + }; + + common::test_load_old(func_name!(), user_stage_v18.as_slice(), want)?; + Ok(()) +} From 2ee48579f235ab395452b8ce9abc056c4d3c53a0 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 31 Oct 2022 15:28:00 +0800 Subject: [PATCH 3/6] Fix typo Signed-off-by: Xuanwo --- src/meta/proto-conv/tests/it/user_stage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/proto-conv/tests/it/user_stage.rs b/src/meta/proto-conv/tests/it/user_stage.rs index 74e46a5f8b3f..be8e7ca8f93f 100644 --- a/src/meta/proto-conv/tests/it/user_stage.rs +++ b/src/meta/proto-conv/tests/it/user_stage.rs @@ -857,7 +857,7 @@ fn test_internal_stage_v17() -> anyhow::Result<()> { fn test_user_stage_v18() -> anyhow::Result<()> { common::test_pb_from_to("user_stage_v18", test_user_stage_info_v18())?; - // Encoded data of version v17 of internal: + // Encoded data of version v18 of user_stage: // It is generated with common::test_pb_from_to. let user_stage_v18 = vec![ 10, 4, 114, 111, 111, 116, 16, 3, 26, 25, 10, 23, 18, 21, 10, 13, 47, 100, 105, 114, 47, From 4f6b87d77340cebf2b28e9806919cadeeb16f5e8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 31 Oct 2022 16:12:24 +0800 Subject: [PATCH 4/6] Fix unit tests Signed-off-by: Xuanwo --- src/meta/types/src/user_stage.rs | 1 - src/query/storages/preludes/src/system/stages_table.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/meta/types/src/user_stage.rs b/src/meta/types/src/user_stage.rs index d7e2b2c64d75..8e8fcc917758 100644 --- a/src/meta/types/src/user_stage.rs +++ b/src/meta/types/src/user_stage.rs @@ -252,7 +252,6 @@ pub struct UserStageInfo { pub comment: String, /// TODO(xuanwo): stage doesn't have this info anymore, remove it. pub number_of_files: u64, - /// TODO(xuanwo): stage doesn't have this info anymore, remove it. pub creator: Option, } diff --git a/src/query/storages/preludes/src/system/stages_table.rs b/src/query/storages/preludes/src/system/stages_table.rs index 21d18ad37ad1..ad6a91a61bfa 100644 --- a/src/query/storages/preludes/src/system/stages_table.rs +++ b/src/query/storages/preludes/src/system/stages_table.rs @@ -59,7 +59,7 @@ impl AsyncSystemTable for StagesTable { copy_options.push(format!("{:?}", stage.copy_options).into_bytes()); file_format_options.push(format!("{:?}", stage.file_format_options).into_bytes()); number_of_files.push(None); - creator.push(None); + creator.push(stage.creator.map(|c| c.to_string().into_bytes())); comment.push(stage.comment.clone().into_bytes()); } Ok(DataBlock::create(self.table_info.schema(), vec![ From 54ae0e6f716c0e372752ec611e7b02d61448d3c1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 31 Oct 2022 16:31:18 +0800 Subject: [PATCH 5/6] Fix the compat test Signed-off-by: Xuanwo --- src/query/storages/preludes/src/system/stages_table.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/storages/preludes/src/system/stages_table.rs b/src/query/storages/preludes/src/system/stages_table.rs index ad6a91a61bfa..e19673a175c1 100644 --- a/src/query/storages/preludes/src/system/stages_table.rs +++ b/src/query/storages/preludes/src/system/stages_table.rs @@ -58,7 +58,8 @@ impl AsyncSystemTable for StagesTable { stage_params.push(format!("{:?}", stage.stage_params).into_bytes()); copy_options.push(format!("{:?}", stage.copy_options).into_bytes()); file_format_options.push(format!("{:?}", stage.file_format_options).into_bytes()); - number_of_files.push(None); + // TODO(xuanwo): we will remove this line. + number_of_files.push(Some(0)); creator.push(stage.creator.map(|c| c.to_string().into_bytes())); comment.push(stage.comment.clone().into_bytes()); } From c5a78006e4a515bbe544d68bb936723616202cb0 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 31 Oct 2022 16:57:54 +0800 Subject: [PATCH 6/6] revert this change Signed-off-by: Xuanwo --- src/query/storages/preludes/src/system/stages_table.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/query/storages/preludes/src/system/stages_table.rs b/src/query/storages/preludes/src/system/stages_table.rs index e19673a175c1..babbec968152 100644 --- a/src/query/storages/preludes/src/system/stages_table.rs +++ b/src/query/storages/preludes/src/system/stages_table.rs @@ -22,6 +22,7 @@ use common_exception::Result; use common_meta_app::schema::TableIdent; use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableMeta; +use common_meta_types::StageType; use common_users::UserApiProvider; use super::table::AsyncOneBlockSystemTable; @@ -59,7 +60,14 @@ impl AsyncSystemTable for StagesTable { copy_options.push(format!("{:?}", stage.copy_options).into_bytes()); file_format_options.push(format!("{:?}", stage.file_format_options).into_bytes()); // TODO(xuanwo): we will remove this line. - number_of_files.push(Some(0)); + match stage.stage_type { + StageType::LegacyInternal | StageType::Internal | StageType::User => { + number_of_files.push(Some(stage.number_of_files)); + } + StageType::External => { + number_of_files.push(None); + } + }; creator.push(stage.creator.map(|c| c.to_string().into_bytes())); comment.push(stage.comment.clone().into_bytes()); }