Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use MetadataKey for kafka topic #5351

Merged
merged 17 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@
//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping source table's {table_id} to {flownode_id}
//! - Used in `Flownode` booting.
//!
//! 11. View info key: `__view_info/{view_id}`
//! - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
//! - This key is mainly used in constructing the view in Datanode and Frontend.
//!
//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
//! - The key is used to mark existing topics in kafka for WAL.
//!
//! All keys have related managers. The managers take care of the serialization and deserialization
//! of keys and values, and the interaction with the underlying KV store backend.
//!
Expand Down Expand Up @@ -100,6 +104,7 @@ pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;
mod tombstone;
pub mod topic_name;
pub(crate) mod txn_helper;
pub mod view_info;

Expand Down Expand Up @@ -158,6 +163,9 @@ pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
// The legacy topic key prefix is used to store the topic name in previous versions.
pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";

/// The keys with these prefixes will be loaded into the cache when the leader starts.
pub const CACHE_KEY_PREFIXES: [&str; 5] = [
Expand Down Expand Up @@ -223,6 +231,11 @@ lazy_static! {
Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
}

lazy_static! {
pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}

/// The key of metadata.
pub trait MetadataKey<'a, T> {
fn to_bytes(&self) -> Vec<u8>;
Expand Down
218 changes: 218 additions & 0 deletions src/common/meta/src/key/topic_name.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{self, Display};
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result};
use crate::key::{
MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{BatchPutRequest, RangeRequest};
use crate::rpc::KeyValue;

#[derive(Debug, Clone, PartialEq)]
pub struct TopicNameKey<'a> {
pub topic: &'a str,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TopicNameValue;

impl<'a> TopicNameKey<'a> {
pub fn new(topic: &'a str) -> Self {
Self { topic }
}

pub fn gen_with_id_and_prefix(id: usize, prefix: &'a str) -> String {
format!("{}_{}", prefix, id)
}

pub fn range_start_key() -> String {
KAFKA_TOPIC_KEY_PREFIX.to_string()
}
}

impl<'a> MetadataKey<'a, TopicNameKey<'a>> for TopicNameKey<'_> {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}

fn from_bytes(bytes: &'a [u8]) -> Result<TopicNameKey<'a>> {
let key = std::str::from_utf8(bytes).map_err(|e| {
InvalidMetadataSnafu {
err_msg: format!(
"TopicNameKey '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
TopicNameKey::try_from(key)
}
}

impl Display for TopicNameKey<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", KAFKA_TOPIC_KEY_PREFIX, self.topic)
}
}

impl<'a> TryFrom<&'a str> for TopicNameKey<'a> {
type Error = Error;

fn try_from(value: &'a str) -> Result<TopicNameKey<'a>> {
let captures = KAFKA_TOPIC_KEY_PATTERN
.captures(value)
.context(InvalidMetadataSnafu {
err_msg: format!("Invalid topic name key: {}", value),
})?;

// Safety: pass the regex check above
Ok(TopicNameKey {
topic: captures.get(1).unwrap().as_str(),
})
}
}

/// Convert a key-value pair to a topic name.
fn topic_decoder(kv: &KeyValue) -> Result<String> {
let key = TopicNameKey::from_bytes(&kv.key)?;
Ok(key.topic.to_string())
}

pub struct TopicNameKeyManager {
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
kv_backend: KvBackendRef,
}
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved

impl Default for TopicNameKeyManager {
fn default() -> Self {
Self::new(Arc::new(MemoryKvBackend::default()))
}
}

impl TopicNameKeyManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}

/// Update the topics in legacy format to the new format.
pub async fn update_legacy_topics(&self) -> Result<()> {
if let Some(kv) = self
.kv_backend
.get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
.await?
{
let topics =
serde_json::from_slice::<Vec<String>>(&kv.value).context(DecodeJsonSnafu)?;
let mut reqs = topics
.iter()
.map(|topic| {
let key = TopicNameKey::new(topic);
TxnOp::Put(key.to_bytes(), vec![])
})
.collect::<Vec<_>>();
let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec());
reqs.push(delete_req);
let txn = Txn::new().and_then(reqs);
self.kv_backend.txn(txn).await?;
}
Ok(())
}

/// Range query for topics.
/// Caution: this method returns keys as String instead of values of range query since the topics are stoired in keys.
pub async fn range(&self) -> Result<Vec<String>> {
let prefix = TopicNameKey::range_start_key();
let raw_prefix = prefix.as_bytes();
let req = RangeRequest::new().with_prefix(raw_prefix);
let resp = self.kv_backend.range(req).await?;
resp.kvs
.iter()
.map(topic_decoder)
.collect::<Result<Vec<String>>>()
}

/// Put topics into kvbackend.
pub async fn batch_put(&self, topic_name_keys: Vec<TopicNameKey<'_>>) -> Result<()> {
let req = BatchPutRequest {
kvs: topic_name_keys
.iter()
.map(|key| KeyValue {
key: key.to_bytes(),
value: vec![],
})
.collect(),
prev_kv: false,
};
self.kv_backend.batch_put(req).await?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;

#[tokio::test]
async fn test_topic_name_key_manager() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicNameKeyManager::new(kv_backend.clone());

let mut all_topics = (0..16)
.map(|i| format!("{}/{}", KAFKA_TOPIC_KEY_PREFIX, i))
.collect::<Vec<_>>();
all_topics.sort();
let topic_name_keys = all_topics
.iter()
.map(|topic| TopicNameKey::new(topic))
.collect::<Vec<_>>();

manager.batch_put(topic_name_keys.clone()).await.unwrap();

let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);

kv_backend
.put(PutRequest {
key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
value: serde_json::to_vec(&all_topics).unwrap(),
prev_kv: false,
})
.await
.unwrap();
manager.update_legacy_topics().await.unwrap();
let res = kv_backend
.get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
.await
.unwrap();
assert!(res.is_none());
let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);

let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);
}
}
23 changes: 11 additions & 12 deletions src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod kafka;
mod kafka_topic_pool;
mod kvbackend;
mod selector;

use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -26,14 +28,14 @@ use store_api::storage::{RegionId, RegionNumber};
use crate::error::{EncodeWalOptionsSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::wal_options_allocator::kafka::topic_manager::TopicManager as KafkaTopicManager;
use crate::wal_options_allocator::kafka_topic_pool::KafkaTopicPool;

/// Allocates wal options in region granularity.
#[derive(Default)]
pub enum WalOptionsAllocator {
#[default]
RaftEngine,
Kafka(KafkaTopicManager),
Kafka(KafkaTopicPool),
}

/// Arc wrapper of WalOptionsAllocator.
Expand All @@ -45,7 +47,7 @@ impl WalOptionsAllocator {
match config {
MetasrvWalConfig::RaftEngine => Self::RaftEngine,
MetasrvWalConfig::Kafka(kafka_config) => {
Self::Kafka(KafkaTopicManager::new(kafka_config, kv_backend))
Self::Kafka(KafkaTopicPool::new(kafka_config, kv_backend))
}
}
}
Expand All @@ -54,7 +56,7 @@ impl WalOptionsAllocator {
pub async fn start(&self) -> Result<()> {
match self {
Self::RaftEngine => Ok(()),
Self::Kafka(kafka_topic_manager) => kafka_topic_manager.start().await,
Self::Kafka(kafka_topic_manager) => kafka_topic_manager.init().await,
}
}

Expand Down Expand Up @@ -146,7 +148,6 @@ mod tests {

use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal_options_allocator::kafka::topic_selector::RoundRobinTopicSelector;

// Tests that the wal options allocator could successfully allocate raft-engine wal options.
#[tokio::test]
Expand Down Expand Up @@ -191,14 +192,12 @@ mod tests {
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let mut topic_manager = KafkaTopicManager::new(config.clone(), kv_backend);
// Replaces the default topic pool with the constructed topics.
topic_manager.topic_pool.clone_from(&topics);
// Replaces the default selector with a round-robin selector without shuffled.
topic_manager.topic_selector = Arc::new(RoundRobinTopicSelector::default());
let mut topic_pool = KafkaTopicPool::new(config.clone(), kv_backend);
topic_pool.topics.clone_from(&topics);
topic_pool.selector = Arc::new(selector::RoundRobinTopicSelector::default());

// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_manager);
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();

let num_regions = 32;
Expand Down
16 changes: 0 additions & 16 deletions src/common/meta/src/wal_options_allocator/kafka.rs

This file was deleted.

Loading
Loading