Skip to content

Commit

Permalink
feat: tableref cache (#3420)
Browse files Browse the repository at this point in the history
* feat: tableref cache

* chore: minor refactor

* chore: avoid to string

* chore: change log level

* feat: add metrics for prometheus remote write decode
  • Loading branch information
fengjiachun authored Mar 2, 2024
1 parent 00cbbc9 commit 2d975e4
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 43 deletions.
7 changes: 7 additions & 0 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ pub enum Error {
source: common_meta::error::Error,
location: Location,
},

#[snafu(display("Get null from table cache, key: {}", key))]
TableCacheNotGet { key: String, location: Location },

#[snafu(display("Failed to get table cache, err: {}", err_msg))]
GetTableCache { err_msg: String },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -311,6 +317,7 @@ impl ErrorExt for Error {
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
Error::TableMetadataManager { source, .. } => source.status_code(),
Error::TableCacheNotGet { .. } | Error::GetTableCache { .. } => StatusCode::Internal,
}
}

Expand Down
26 changes: 11 additions & 15 deletions src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,10 @@ impl CachedMetaKvBackendBuilder {
let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL);
let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI);

let cache = Arc::new(
CacheBuilder::new(cache_max_capacity)
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build(),
);
let cache = CacheBuilder::new(cache_max_capacity)
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build();

let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
Expand All @@ -104,7 +102,7 @@ impl CachedMetaKvBackendBuilder {
}
}

pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
pub type CacheBackend = Cache<Vec<u8>, KeyValue>;

/// A wrapper of `MetaKvBackend` with cache support.
///
Expand All @@ -117,7 +115,7 @@ pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
/// TTL and TTI for cache.
pub struct CachedMetaKvBackend {
kv_backend: KvBackendRef,
cache: CacheBackendRef,
cache: CacheBackend,
name: String,
version: AtomicUsize,
}
Expand Down Expand Up @@ -317,12 +315,10 @@ impl CachedMetaKvBackend {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
let cache = Arc::new(
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build(),
);
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();

let name = format!("CachedKvBackend({})", kv_backend.name());
Self {
Expand All @@ -333,7 +329,7 @@ impl CachedMetaKvBackend {
}
}

pub fn cache(&self) -> &CacheBackendRef {
pub fn cache(&self) -> &CacheBackend {
&self.cache
}

Expand Down
91 changes: 67 additions & 24 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
use std::any::Any;
use std::collections::BTreeSet;
use std::sync::{Arc, Weak};
use std::time::Duration;

use async_stream::try_stream;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
Expand All @@ -32,6 +34,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use moka::future::{Cache as AsyncCache, CacheBuilder};
use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
Expand All @@ -40,9 +43,10 @@ use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::TableRef;

use crate::error::Error::{GetTableCache, TableCacheNotGet};
use crate::error::{
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
Result as CatalogResult, TableMetadataManagerSnafu,
Result as CatalogResult, TableCacheNotGetSnafu, TableMetadataManagerSnafu,
};
use crate::information_schema::InformationSchemaProvider;
use crate::CatalogManager;
Expand All @@ -62,6 +66,7 @@ pub struct KvBackendCatalogManager {
table_metadata_manager: TableMetadataManagerRef,
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
table_cache: AsyncCache<String, TableRef>,
}

fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
Expand All @@ -81,13 +86,24 @@ impl CacheInvalidator for KvBackendCatalogManager {
}

async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
let table_cache_key = format_full_table_name(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
self.cache_invalidator
.invalidate_table_name(ctx, table_name)
.await
.await?;
self.table_cache.invalidate(&table_cache_key).await;

Ok(())
}
}

const DEFAULT_CACHED_CATALOG: u64 = 128;
const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
const TABLE_CACHE_MAX_CAPACITY: u64 = 65536;
const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60);

impl KvBackendCatalogManager {
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
Expand All @@ -97,12 +113,16 @@ impl KvBackendCatalogManager {
cache_invalidator,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(DEFAULT_CACHED_CATALOG),
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
information_schema_provider: Arc::new(InformationSchemaProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
},
table_cache: CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
.time_to_live(TABLE_CACHE_TTL)
.time_to_idle(TABLE_CACHE_TTI)
.build(),
})
}

Expand Down Expand Up @@ -217,29 +237,52 @@ impl CatalogManager for KvBackendCatalogManager {
return Ok(Some(table));
}

let key = TableNameKey::new(catalog, schema, table_name);
let Some(table_name_value) = self
.table_metadata_manager
.table_name_manager()
.get(key)
.await
.context(TableMetadataManagerSnafu)?
else {
return Ok(None);
let init = async {
let table_name_key = TableNameKey::new(catalog, schema, table_name);
let Some(table_name_value) = self
.table_metadata_manager
.table_name_manager()
.get(table_name_key)
.await
.context(TableMetadataManagerSnafu)?
else {
return TableCacheNotGetSnafu {
key: table_name_key.to_string(),
}
.fail();
};
let table_id = table_name_value.table_id();

let Some(table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner())
else {
return TableCacheNotGetSnafu {
key: table_name_key.to_string(),
}
.fail();
};
make_table(table_info_value)
};
let table_id = table_name_value.table_id();

let Some(table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
match self
.table_cache
.try_get_with_by_ref(&format_full_table_name(catalog, schema, table_name), init)
.await
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner())
else {
return Ok(None);
};
make_table(table_info_value).map(Some)
{
Ok(table) => Ok(Some(table)),
Err(err) => match err.as_ref() {
TableCacheNotGet { .. } => Ok(None),
_ => Err(err),
},
}
.map_err(|err| GetTableCache {
err_msg: err.to_string(),
})
}

async fn tables<'a>(
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, tracing};
use common_telemetry::{debug, info, tracing};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::TableId;

Expand Down Expand Up @@ -545,7 +545,7 @@ impl ProcedureExecutor for DdlManager {
.attach(tracing::info_span!("DdlManager::submit_ddl_task"));
async move {
let cluster_id = ctx.cluster_id.unwrap_or_default();
info!("Submitting Ddl task: {:?}", request.task);
debug!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
handle_create_table_task(self, cluster_id, create_table_task).await
Expand Down
7 changes: 6 additions & 1 deletion src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,18 @@ pub async fn remote_read(
}

async fn decode_remote_write_request(body: Body) -> Result<WriteRequest> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;

let buf = snappy_decompress(&body[..])?;

WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
let request = WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)?;
crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES
.observe(request.timeseries.len() as f64);

Ok(request)
}

async fn decode_remote_read_request(body: Body) -> Result<ReadRequest> {
Expand Down
15 changes: 15 additions & 0 deletions src/servers/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ lazy_static! {
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_PROM_STORE_DECODE_ELAPSED: Histogram = register_histogram!(
"greptime_servers_http_prometheus_decode_elapsed",
"servers http prometheus decode elapsed",
)
.unwrap();
pub static ref METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES: Histogram = register_histogram!(
"greptime_servers_http_prometheus_decode_num_series",
"servers http prometheus decode num series",
)
.unwrap();
pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = register_histogram!(
"greptime_servers_http_prometheus_convert_elapsed",
"servers http prometheus convert to gRPC request elapsed",
)
.unwrap();
pub static ref METRIC_HTTP_PROM_STORE_READ_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_servers_http_prometheus_read_elapsed",
"servers http prometheus read elapsed",
Expand Down
2 changes: 2 additions & 0 deletions src/servers/src/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
}

pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();

let mut multi_table_data = MultiTableData::new();

for series in &request.timeseries {
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/row_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ fn write_by_semantic_type(
let index = column_indexes.entry(name.clone()).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: name.to_string(),
column_name: name,
datatype: datatype as i32,
semantic_type: semantic_type as i32,
..Default::default()
Expand Down

0 comments on commit 2d975e4

Please sign in to comment.