Skip to content

Commit

Permalink
add cache config about metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
fengys1996 committed Feb 4, 2024
1 parent 2cad4da commit 3c898c7
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 29 deletions.
4 changes: 4 additions & 0 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ timeout = "3s"
ddl_timeout = "10s"
connect_timeout = "1s"
tcp_nodelay = true
# The configuration about the cache of the Metadata.
cached_max_capacity = 100000
cached_ttl = "10m"
cached_tti = "5m"

# Log options, see `standalone.example.toml`
# [logging]
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub use client::{CachedMetaKvBackend, MetaKvBackend};
pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend};

mod client;
mod manager;
Expand Down
86 changes: 74 additions & 12 deletions src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,74 @@ use crate::metrics::{
METRIC_CATALOG_KV_BATCH_GET, METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET,
};

const CACHE_MAX_CAPACITY: u64 = 10000;
const CACHE_TTL_SECOND: u64 = 10 * 60;
const CACHE_TTI_SECOND: u64 = 5 * 60;
const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
const DEFAULT_CACHE_TTL_SECOND: u64 = 10 * 60;
const DEFAULT_CACHE_TTI_SECOND: u64 = 5 * 60;

pub struct CachedMetaKvBackendBuilder {
cached_max_capacity: Option<u64>,
cached_ttl: Option<Duration>,
cached_tti: Option<Duration>,
meta_client: Arc<MetaClient>,
}

impl CachedMetaKvBackendBuilder {
pub fn new(meta_client: Arc<MetaClient>) -> Self {
Self {
cached_max_capacity: None,
cached_ttl: None,
cached_tti: None,
meta_client,
}
}

pub fn cached_max_capacity(mut self, cache_max_capacity: u64) -> Self {
self.cached_max_capacity.replace(cache_max_capacity);
self
}

pub fn cached_ttl_second(mut self, cache_ttl: Duration) -> Self {
self.cached_ttl.replace(cache_ttl);
self
}

pub fn cached_tti_second(mut self, cache_tti: Duration) -> Self {
self.cached_tti.replace(cache_tti);
self
}

pub fn build(self) -> CachedMetaKvBackend {
let cache_max_capacity = self
.cached_max_capacity
.unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
let cache_ttl = self
.cached_ttl
.unwrap_or(Duration::from_secs(DEFAULT_CACHE_TTL_SECOND));
let cache_tti = self
.cached_tti
.unwrap_or(Duration::from_secs(DEFAULT_CACHE_TTI_SECOND));

let cache = Arc::new(
CacheBuilder::new(cache_max_capacity)
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build(),
);

let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
});
let name = format!("CachedKvBackend({})", kv_backend.name());
let version = AtomicUsize::new(0);

CachedMetaKvBackend {
kv_backend,
cache,
name,
version,
}
}
}

pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;

Expand Down Expand Up @@ -242,16 +307,13 @@ impl KvCacheInvalidator for CachedMetaKvBackend {
}

impl CachedMetaKvBackend {
pub fn new(client: Arc<MetaClient>) -> Self {
let kv_backend = Arc::new(MetaKvBackend { client });
Self::wrap(kv_backend)
}

pub fn wrap(kv_backend: KvBackendRef) -> Self {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
let cache = Arc::new(
CacheBuilder::new(CACHE_MAX_CAPACITY)
.time_to_live(Duration::from_secs(CACHE_TTL_SECOND))
.time_to_idle(Duration::from_secs(CACHE_TTI_SECOND))
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(Duration::from_secs(DEFAULT_CACHE_TTL_SECOND))
.time_to_idle(Duration::from_secs(DEFAULT_CACHE_TTI_SECOND))
.build(),
);

Expand Down
7 changes: 5 additions & 2 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;

use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager};
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::ext::ErrorExt;
Expand Down Expand Up @@ -248,7 +250,8 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
.context(StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);

let cached_meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());

let catalog_list =
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend);
Expand Down
22 changes: 17 additions & 5 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use catalog::kvbackend::CachedMetaKvBackend;
use catalog::kvbackend::CachedMetaKvBackendBuilder;
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
Expand Down Expand Up @@ -228,15 +228,27 @@ impl StartCommand {
let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client'",
})?;

let cached_max_capacity = meta_client_options.cached_max_capacity;
let cached_ttl = meta_client_options.cached_ttl;
let cached_tti = meta_client_options.cached_tti;

let meta_client = FeInstance::create_meta_client(meta_client_options)
.await
.context(StartFrontendSnafu)?;

let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cached_max_capacity(cached_max_capacity)
.cached_ttl_second(cached_ttl)
.cached_tti_second(cached_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())),
Arc::new(InvalidateTableCacheHandler::new(
cached_meta_backend.clone(),
)),
]);

let heartbeat_task = HeartbeatTask::new(
Expand All @@ -246,11 +258,11 @@ impl StartCommand {
);

let mut instance = FrontendBuilder::new(
meta_backend.clone(),
cached_meta_backend.clone(),
Arc::new(DatanodeClients::default()),
meta_client,
)
.with_cache_invalidator(meta_backend)
.with_cache_invalidator(cached_meta_backend)
.with_plugin(plugins.clone())
.with_heartbeat_task(heartbeat_task)
.try_build()
Expand Down
23 changes: 23 additions & 0 deletions src/meta-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ pub struct MetaClientOptions {
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
pub tcp_nodelay: bool,
#[serde(default = "default_cached_max_capacity")]
pub cached_max_capacity: u64,
#[serde(default = "default_cached_ttl")]
#[serde(with = "humantime_serde")]
pub cached_ttl: Duration,
#[serde(default = "default_cached_tti")]
#[serde(with = "humantime_serde")]
pub cached_tti: Duration,
}

fn default_heartbeat_timeout() -> Duration {
Expand All @@ -54,6 +62,18 @@ fn default_timeout() -> Duration {
Duration::from_millis(3_000u64)
}

fn default_cached_max_capacity() -> u64 {
100_000u64
}

fn default_cached_ttl() -> Duration {
Duration::from_secs(600u64)
}

fn default_cached_tti() -> Duration {
Duration::from_secs(300u64)
}

impl Default for MetaClientOptions {
fn default() -> Self {
Self {
Expand All @@ -63,6 +83,9 @@ impl Default for MetaClientOptions {
ddl_timeout: default_ddl_timeout(),
connect_timeout: default_connect_timeout(),
tcp_nodelay: true,
cached_max_capacity: default_cached_max_capacity(),
cached_ttl: default_cached_ttl(),
cached_tti: default_cached_tti(),
}
}
}
Expand Down
22 changes: 13 additions & 9 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::Duration;
use api::v1::meta::Role;
use api::v1::region::region_server::RegionServer;
use arrow_flight::flight_service_server::FlightServiceServer;
use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, MetaKvBackend};
use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
Expand Down Expand Up @@ -350,11 +350,14 @@ impl GreptimeDbClusterBuilder {
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);

let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());

let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())),
Arc::new(InvalidateTableCacheHandler::new(
cached_meta_backend.clone(),
)),
]);

let heartbeat_task = HeartbeatTask::new(
Expand All @@ -363,12 +366,13 @@ impl GreptimeDbClusterBuilder {
Arc::new(handlers_executor),
);

let instance = FrontendBuilder::new(meta_backend.clone(), datanode_clients, meta_client)
.with_cache_invalidator(meta_backend)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
.unwrap();
let instance =
FrontendBuilder::new(cached_meta_backend.clone(), datanode_clients, meta_client)
.with_cache_invalidator(cached_meta_backend)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
.unwrap();

Arc::new(instance)
}
Expand Down

0 comments on commit 3c898c7

Please sign in to comment.