Skip to content

Commit

Permalink
chore: assign topics with hash map
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Nov 24, 2023
1 parent 6f41530 commit 6087d92
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
29 changes: 18 additions & 11 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
Expand Down Expand Up @@ -39,6 +41,7 @@ use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::wal::kafka::KafkaTopic;

// TODO(niebayes): Maybe move `TOPIC_KEY` into a more appropriate crate.
pub const TOPIC_KEY: &str = "kafka_topic";

pub struct CreateTableProcedure {
Expand Down Expand Up @@ -176,20 +179,26 @@ impl CreateTableProcedure {
let region_routes = &create_table_data.region_routes;
let region_topics = &create_table_data.region_topics;

// The following checking is redundant as the wal meta allocator ensures the allocated topics
// are of the same length as the region routes. The checking only makes sense in distributed mode.
// The following checking is redundant as the wal meta allocator ensures the allocated
// region topics are of the same length as the region routes.
if !region_topics.is_empty() {
let num_region_topics = region_topics.len();
let num_region_routes = region_routes.len();
let num_region_topics = region_topics.len();
ensure!(
num_region_topics != num_region_routes,
num_region_routes == num_region_topics,
UnexpectedNumRegionTopicsSnafu {
num_region_topics,
num_region_routes,
num_region_routes
}
);
}

let region_topic_map: HashMap<_, _> = region_routes
.iter()
.map(|route| route.region.id.region_number())
.zip(region_topics)
.collect();

let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
Expand All @@ -206,19 +215,17 @@ impl CreateTableProcedure {
let regions = find_leader_regions(region_routes, &datanode);
let requests = regions
.iter()
.enumerate()
.map(|(i, region_number)| {
.map(|region_number| {
let region_id = RegionId::new(self.table_id(), *region_number);

let mut create_region_request = request_template.clone();
create_region_request.region_id = region_id.as_u64();
create_region_request.path = storage_path.clone();

if !region_topics.is_empty() {
region_topic_map.get(region_number).and_then(|topic| {
create_region_request
.options
.insert(TOPIC_KEY.to_string(), region_topics[i].clone());
}
.insert(TOPIC_KEY.to_string(), topic.to_string())
});

PbRegionRequest::Create(create_region_request)
})
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub enum Error {
#[snafu(display(
"Unexpected number of region topics, num_region_topics: {}, num_region_routes: {}",
num_region_topics,
num_region_routes
num_region_routes,
))]
UnexpectedNumRegionTopics {
num_region_topics: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/table_meta_alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;

// Each region gets assigned one topic.
let region_topics = match self.wal_meta_allocator.wal_provider() {
WalProvider::RaftEngine => vec![],
WalProvider::Kafka => {
// Each region gets assigned one topic.
self.wal_meta_allocator
.try_alloc_topics(region_routes.len())
.await?
Expand Down

0 comments on commit 6087d92

Please sign in to comment.