From 6087d924e3acf41384d9fb5982cc0a2e5d03e8bd Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 24 Nov 2023 11:01:11 +0800 Subject: [PATCH] chore: assign topics with hash map --- src/common/meta/src/ddl/create_table.rs | 29 +++++++++++++++---------- src/common/meta/src/error.rs | 2 +- src/meta-srv/src/table_meta_alloc.rs | 2 +- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index fd067245a890..ba082a7add5b 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{ CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, @@ -39,6 +41,7 @@ use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; use crate::wal::kafka::KafkaTopic; +// TODO(niebayes): Maybe move `TOPIC_KEY` into a more appropriate crate. pub const TOPIC_KEY: &str = "kafka_topic"; pub struct CreateTableProcedure { @@ -176,20 +179,26 @@ impl CreateTableProcedure { let region_routes = &create_table_data.region_routes; let region_topics = &create_table_data.region_topics; - // The following checking is redundant as the wal meta allocator ensures the allocated topics - // are of the same length as the region routes. The checking only makes sense in distributed mode. + // The following checking is redundant as the wal meta allocator ensures the allocated + // region topics are of the same length as the region routes. if !region_topics.is_empty() { - let num_region_topics = region_topics.len(); let num_region_routes = region_routes.len(); + let num_region_topics = region_topics.len(); ensure!( - num_region_topics != num_region_routes, + num_region_routes == num_region_topics, UnexpectedNumRegionTopicsSnafu { num_region_topics, - num_region_routes, + num_region_routes } ); } + let region_topic_map: HashMap<_, _> = region_routes + .iter() + .map(|route| route.region.id.region_number()) + .zip(region_topics) + .collect(); + let create_table_expr = &create_table_data.task.create_table; let catalog = &create_table_expr.catalog_name; let schema = &create_table_expr.schema_name; @@ -206,19 +215,17 @@ impl CreateTableProcedure { let regions = find_leader_regions(region_routes, &datanode); let requests = regions .iter() - .enumerate() - .map(|(i, region_number)| { + .map(|region_number| { let region_id = RegionId::new(self.table_id(), *region_number); let mut create_region_request = request_template.clone(); create_region_request.region_id = region_id.as_u64(); create_region_request.path = storage_path.clone(); - - if !region_topics.is_empty() { + region_topic_map.get(region_number).and_then(|topic| { create_region_request .options - .insert(TOPIC_KEY.to_string(), region_topics[i].clone()); - } + .insert(TOPIC_KEY.to_string(), topic.to_string()) + }); PbRegionRequest::Create(create_region_request) }) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index d1071a8c68ea..f8dc6a57d99e 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -331,7 +331,7 @@ pub enum Error { #[snafu(display( "Unexpected number of region topics, num_region_topics: {}, num_region_routes: {}", num_region_topics, - num_region_routes + num_region_routes, ))] UnexpectedNumRegionTopics { num_region_topics: usize, diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index def8973182a2..a1ebfd7fe77b 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -73,10 +73,10 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; - // Each region gets assigned one topic. let region_topics = match self.wal_meta_allocator.wal_provider() { WalProvider::RaftEngine => vec![], WalProvider::Kafka => { + // Each region gets assigned one topic. self.wal_meta_allocator .try_alloc_topics(region_routes.len()) .await?