Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use MetadataKey for kafka topic #5351

Merged
merged 17 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to build wal options allocator"))]
BuildWalOptionsAllocator {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -378,7 +385,8 @@ impl ErrorExt for Error {

Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::BuildWalOptionsAllocator { source, .. }
| Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => {
StatusCode::Internal
}
Expand Down
19 changes: 10 additions & 9 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_telemetry::info;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
Expand Down Expand Up @@ -76,10 +76,10 @@ use tokio::sync::{broadcast, RwLock};
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu,
InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu,
StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
BuildCacheRegistrySnafu, BuildWalOptionsAllocatorSnafu, CreateDirSnafu, IllegalConfigSnafu,
InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu,
Result, ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
Expand Down Expand Up @@ -563,10 +563,11 @@ impl StartCommand {
.step(10)
.build(),
);
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
opts.wal.clone().into(),
kv_backend.clone(),
));
let kafka_options = opts.wal.clone().into();
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
.await
.context(BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
Expand Down
13 changes: 13 additions & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@
//! 10. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping source table's {table_id} to {flownode_id}
//! - Used in `Flownode` booting.
//!
//! 11. View info key: `__view_info/{view_id}`
//! - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
//! - This key is mainly used in constructing the view in Datanode and Frontend.
//!
//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
//! - The key is used to mark existing topics in kafka for WAL.
//!
//! All keys have related managers. The managers take care of the serialization and deserialization
//! of keys and values, and the interaction with the underlying KV store backend.
//!
Expand Down Expand Up @@ -100,6 +104,7 @@ pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;
mod tombstone;
pub mod topic_name;
pub(crate) mod txn_helper;
pub mod view_info;

Expand Down Expand Up @@ -158,6 +163,9 @@ pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
pub const NODE_ADDRESS_PREFIX: &str = "__node_address";
pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
// The legacy topic key prefix is used to store the topic name in previous versions.
pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";

/// The keys with these prefixes will be loaded into the cache when the leader starts.
pub const CACHE_KEY_PREFIXES: [&str; 5] = [
Expand Down Expand Up @@ -223,6 +231,11 @@ lazy_static! {
Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
}

lazy_static! {
pub static ref KAFKA_TOPIC_KEY_PATTERN: Regex =
Regex::new(&format!("^{KAFKA_TOPIC_KEY_PREFIX}/(.*)$")).unwrap();
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}

/// The key of metadata.
pub trait MetadataKey<'a, T> {
fn to_bytes(&self) -> Vec<u8>;
Expand Down
218 changes: 218 additions & 0 deletions src/common/meta/src/key/topic_name.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{self, Display};
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result};
use crate::key::{
MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{BatchPutRequest, RangeRequest};
use crate::rpc::KeyValue;

#[derive(Debug, Clone, PartialEq)]
pub struct TopicNameKey<'a> {
pub topic: &'a str,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TopicNameValue;

impl<'a> TopicNameKey<'a> {
pub fn new(topic: &'a str) -> Self {
Self { topic }
}

pub fn gen_with_id_and_prefix(id: usize, prefix: &'a str) -> String {
format!("{}_{}", prefix, id)
}

pub fn range_start_key() -> String {
KAFKA_TOPIC_KEY_PREFIX.to_string()
}
}

impl<'a> MetadataKey<'a, TopicNameKey<'a>> for TopicNameKey<'_> {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}

fn from_bytes(bytes: &'a [u8]) -> Result<TopicNameKey<'a>> {
let key = std::str::from_utf8(bytes).map_err(|e| {
InvalidMetadataSnafu {
err_msg: format!(
"TopicNameKey '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
TopicNameKey::try_from(key)
}
}

impl Display for TopicNameKey<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", KAFKA_TOPIC_KEY_PREFIX, self.topic)
}
}

impl<'a> TryFrom<&'a str> for TopicNameKey<'a> {
type Error = Error;

fn try_from(value: &'a str) -> Result<TopicNameKey<'a>> {
let captures = KAFKA_TOPIC_KEY_PATTERN
.captures(value)
.context(InvalidMetadataSnafu {
err_msg: format!("Invalid topic name key: {}", value),
})?;

// Safety: pass the regex check above
Ok(TopicNameKey {
topic: captures.get(1).unwrap().as_str(),
})
}
}

/// Convert a key-value pair to a topic name.
fn topic_decoder(kv: &KeyValue) -> Result<String> {
let key = TopicNameKey::from_bytes(&kv.key)?;
Ok(key.topic.to_string())
}

pub struct TopicNameKeyManager {
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
kv_backend: KvBackendRef,
}
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved

impl Default for TopicNameKeyManager {
fn default() -> Self {
Self::new(Arc::new(MemoryKvBackend::default()))
}
}

impl TopicNameKeyManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}

/// Update the topics in legacy format to the new format.
pub async fn update_legacy_topics(&self) -> Result<()> {
if let Some(kv) = self
.kv_backend
.get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
.await?
{
let topics =
serde_json::from_slice::<Vec<String>>(&kv.value).context(DecodeJsonSnafu)?;
let mut reqs = topics
.iter()
.map(|topic| {
let key = TopicNameKey::new(topic);
TxnOp::Put(key.to_bytes(), vec![])
})
.collect::<Vec<_>>();
let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec());
reqs.push(delete_req);
let txn = Txn::new().and_then(reqs);
self.kv_backend.txn(txn).await?;
}
Ok(())
}

/// Range query for topics.
/// Caution: this method returns keys as String instead of values of range query since the topics are stoired in keys.
pub async fn range(&self) -> Result<Vec<String>> {
let prefix = TopicNameKey::range_start_key();
let raw_prefix = prefix.as_bytes();
let req = RangeRequest::new().with_prefix(raw_prefix);
let resp = self.kv_backend.range(req).await?;
resp.kvs
.iter()
.map(topic_decoder)
.collect::<Result<Vec<String>>>()
}

/// Put topics into kvbackend.
pub async fn batch_put(&self, topic_name_keys: Vec<TopicNameKey<'_>>) -> Result<()> {
let req = BatchPutRequest {
kvs: topic_name_keys
.iter()
.map(|key| KeyValue {
key: key.to_bytes(),
value: vec![],
})
.collect(),
prev_kv: false,
};
self.kv_backend.batch_put(req).await?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;

#[tokio::test]
async fn test_topic_name_key_manager() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicNameKeyManager::new(kv_backend.clone());

let mut all_topics = (0..16)
.map(|i| format!("{}/{}", KAFKA_TOPIC_KEY_PREFIX, i))
.collect::<Vec<_>>();
all_topics.sort();
let topic_name_keys = all_topics
.iter()
.map(|topic| TopicNameKey::new(topic))
.collect::<Vec<_>>();

manager.batch_put(topic_name_keys.clone()).await.unwrap();

let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);

kv_backend
.put(PutRequest {
key: LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec(),
value: serde_json::to_vec(&all_topics).unwrap(),
prev_kv: false,
})
.await
.unwrap();
manager.update_legacy_topics().await.unwrap();
let res = kv_backend
.get(LEGACY_TOPIC_KEY_PREFIX.as_bytes())
.await
.unwrap();
assert!(res.is_none());
let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);

let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);
}
}
Loading
Loading