Skip to content

Commit

Permalink
chore(remote_wal): remove topic alias (#3120)
Browse files Browse the repository at this point in the history
chore: remove topic alias
  • Loading branch information
niebayes authored Jan 9, 2024
1 parent 0db1861 commit a0a31c8
Show file tree
Hide file tree
Showing 12 changed files with 23 additions and 58 deletions.
4 changes: 1 addition & 3 deletions src/common/config/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ pub mod raft_engine;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

pub use crate::wal::kafka::{
KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig, Topic as KafkaWalTopic,
};
pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig};
pub use crate::wal::raft_engine::RaftEngineConfig;

/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
Expand Down
7 changes: 1 addition & 6 deletions src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ use rskafka::client::partition::Compression as RsKafkaCompression;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

/// Topic name prefix.
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
/// Kafka wal topic.
pub type Topic = String;

/// The type of the topic selector, i.e. with which strategy to select a topic.
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -138,5 +133,5 @@ impl Default for StandaloneKafkaConfig {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaOptions {
/// Kafka wal topic.
pub topic: Topic,
pub topic: String,
}
1 change: 0 additions & 1 deletion src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, RegionNumber};

use crate::wal::kafka::KafkaConfig;
pub use crate::wal::kafka::Topic as KafkaWalTopic;
pub use crate::wal::options_allocator::{
allocate_region_wal_options, WalOptionsAllocator, WalOptionsAllocatorRef,
};
Expand Down
2 changes: 0 additions & 2 deletions src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod topic;
pub mod topic_manager;
pub mod topic_selector;

Expand All @@ -23,7 +22,6 @@ use std::time::Duration;
use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType};
use serde::{Deserialize, Serialize};

pub use crate::wal::kafka::topic::Topic;
pub use crate::wal::kafka::topic_manager::TopicManager;

/// Configurations for kafka wal.
Expand Down
19 changes: 0 additions & 19 deletions src/common/meta/src/wal/kafka/topic.rs

This file was deleted.

19 changes: 9 additions & 10 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::error::{
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
use crate::wal::kafka::topic::Topic;
use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, TopicSelectorRef};
use crate::wal::kafka::KafkaConfig;

Expand All @@ -46,7 +45,7 @@ const DEFAULT_PARTITION: i32 = 0;
/// Manages topic initialization and selection.
pub struct TopicManager {
config: KafkaConfig,
pub(crate) topic_pool: Vec<Topic>,
pub(crate) topic_pool: Vec<String>,
pub(crate) topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
}
Expand Down Expand Up @@ -86,7 +85,7 @@ impl TopicManager {
let created_topics = Self::restore_created_topics(&self.kv_backend)
.await?
.into_iter()
.collect::<HashSet<Topic>>();
.collect::<HashSet<String>>();

// Creates missing topics.
let to_be_created = topics
Expand All @@ -108,7 +107,7 @@ impl TopicManager {
}

/// Tries to create topics specified by indexes in `to_be_created`.
async fn try_create_topics(&self, topics: &[Topic], to_be_created: &[usize]) -> Result<()> {
async fn try_create_topics(&self, topics: &[String], to_be_created: &[usize]) -> Result<()> {
// Builds an kafka controller client for creating topics.
let backoff_config = BackoffConfig {
init_backoff: self.config.backoff.init,
Expand Down Expand Up @@ -141,18 +140,18 @@ impl TopicManager {
}

/// Selects one topic from the topic pool through the topic selector.
pub fn select(&self) -> Result<&Topic> {
pub fn select(&self) -> Result<&String> {
self.topic_selector.select(&self.topic_pool)
}

/// Selects a batch of topics from the topic pool through the topic selector.
pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&Topic>> {
pub fn select_batch(&self, num_topics: usize) -> Result<Vec<&String>> {
(0..num_topics)
.map(|_| self.topic_selector.select(&self.topic_pool))
.collect()
}

async fn try_append_noop_record(&self, topic: &Topic, client: &Client) -> Result<()> {
async fn try_append_noop_record(&self, topic: &String, client: &Client) -> Result<()> {
let partition_client = client
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
Expand All @@ -177,7 +176,7 @@ impl TopicManager {
Ok(())
}

async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> {
async fn try_create_topic(&self, topic: &String, client: &ControllerClient) -> Result<()> {
match client
.create_topic(
topic.clone(),
Expand All @@ -203,7 +202,7 @@ impl TopicManager {
}
}

async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<Topic>> {
async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<String>> {
kv_backend
.get(CREATED_TOPICS_KEY.as_bytes())
.await?
Expand All @@ -213,7 +212,7 @@ impl TopicManager {
)
}

async fn persist_created_topics(topics: &[Topic], kv_backend: &KvBackendRef) -> Result<()> {
async fn persist_created_topics(topics: &[String], kv_backend: &KvBackendRef) -> Result<()> {
let raw_topics = serde_json::to_vec(topics).context(EncodeJsonSnafu)?;
kv_backend
.put(PutRequest {
Expand Down
5 changes: 2 additions & 3 deletions src/common/meta/src/wal/kafka/topic_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ use rand::Rng;
use snafu::ensure;

use crate::error::{EmptyTopicPoolSnafu, Result};
use crate::wal::kafka::topic::Topic;

/// Controls topic selection.
pub(crate) trait TopicSelector: Send + Sync {
/// Selects a topic from the topic pool.
fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic>;
fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String>;
}

/// Arc wrapper of TopicSelector.
Expand All @@ -48,7 +47,7 @@ impl RoundRobinTopicSelector {
}

impl TopicSelector for RoundRobinTopicSelector {
fn select<'a>(&self, topic_pool: &'a [Topic]) -> Result<&'a Topic> {
fn select<'a>(&self, topic_pool: &'a [String]) -> Result<&'a String> {
ensure!(!topic_pool.is_empty(), EmptyTopicPoolSnafu);
let which = self.cursor.fetch_add(1, Ordering::Relaxed) % topic_pool.len();
Ok(&topic_pool[which])
Expand Down
5 changes: 2 additions & 3 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::any::Any;

use common_config::wal::KafkaWalTopic;
use common_error::ext::ErrorExt;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
Expand Down Expand Up @@ -120,7 +119,7 @@ pub enum Error {
error
))]
GetClient {
topic: KafkaWalTopic,
topic: String,
location: Location,
error: String,
},
Expand All @@ -141,7 +140,7 @@ pub enum Error {
limit,
))]
ProduceRecord {
topic: KafkaWalTopic,
topic: String,
size: usize,
limit: usize,
location: Location,
Expand Down
3 changes: 1 addition & 2 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub(crate) mod util;

use std::fmt::Display;

use common_meta::wal::KafkaWalTopic as Topic;
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::namespace::Namespace;
Expand All @@ -29,7 +28,7 @@ use crate::error::Error;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct NamespaceImpl {
pub region_id: u64,
pub topic: Topic,
pub topic: String,
}

impl Namespace for NamespaceImpl {
Expand Down
10 changes: 5 additions & 5 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic};
use common_config::wal::KafkaConfig;
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
Expand Down Expand Up @@ -67,7 +67,7 @@ pub(crate) struct ClientManager {
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
/// Key: a topic. Value: the associated client of the topic.
client_pool: RwLock<HashMap<Topic, Client>>,
client_pool: RwLock<HashMap<String, Client>>,
}

impl ClientManager {
Expand Down Expand Up @@ -97,7 +97,7 @@ impl ClientManager {

/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
pub(crate) async fn get_or_insert(&self, topic: &String) -> Result<Client> {
{
let client_pool = self.client_pool.read().await;
if let Some(client) = client_pool.get(topic) {
Expand All @@ -116,7 +116,7 @@ impl ClientManager {
}
}

async fn try_create_client(&self, topic: &Topic) -> Result<Client> {
async fn try_create_client(&self, topic: &String) -> Result<Client> {
// Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error.
// That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
// The reconnecting won't stop until succeed or a different error returns.
Expand Down Expand Up @@ -147,7 +147,7 @@ mod tests {
test_name: &str,
num_topics: usize,
broker_endpoints: Vec<String>,
) -> (ClientManager, Vec<Topic>) {
) -> (ClientManager, Vec<String>) {
let topics = create_topics(
num_topics,
|i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()),
Expand Down
3 changes: 1 addition & 2 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ fn check_termination(
#[cfg(test)]
mod tests {
use common_base::readable_size::ReadableSize;
use common_config::wal::KafkaWalTopic as Topic;
use rand::seq::IteratorRandom;

use super::*;
Expand All @@ -304,7 +303,7 @@ mod tests {
test_name: &str,
num_topics: usize,
broker_endpoints: Vec<String>,
) -> (KafkaLogStore, Vec<Topic>) {
) -> (KafkaLogStore, Vec<String>) {
let topics = create_topics(
num_topics,
|i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()),
Expand Down
3 changes: 1 addition & 2 deletions src/log-store/src/test_util/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering};
use std::sync::Mutex;

use common_meta::wal::KafkaWalTopic as Topic;
use rand::distributions::Alphanumeric;
use rand::rngs::ThreadRng;
use rand::{thread_rng, Rng};
Expand All @@ -29,7 +28,7 @@ pub async fn create_topics<F>(
num_topics: usize,
decorator: F,
broker_endpoints: &[String],
) -> Vec<Topic>
) -> Vec<String>
where
F: Fn(usize) -> String,
{
Expand Down

0 comments on commit a0a31c8

Please sign in to comment.