Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: http route directly from meta #1221

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions proxy/src/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use std::{

use async_trait::async_trait;
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, RequestContext, RouteRequest,
storage_service_client::StorageServiceClient, RequestContext, RouteRequest as RouteRequestPb,
};
use log::{debug, error, warn};
use macros::define_result;
use router::{endpoint::Endpoint, RouterRef};
use router::{endpoint::Endpoint, RouteRequest, RouterRef};
use serde::{Deserialize, Serialize};
use snafu::{Backtrace, ResultExt, Snafu};
use time_ext::ReadableDuration;
Expand Down Expand Up @@ -282,12 +282,13 @@ impl<B: ClientBuilder> Forwarder<B> {
forwarded_from,
} = forward_req;

let route_req = RouteRequest {
let req_pb = RouteRequestPb {
context: Some(RequestContext { database: schema }),
tables: vec![table],
};

let endpoint = match self.router.route(route_req).await {
let request = RouteRequest::new(req_pb, true);
let endpoint = match self.router.route(request).await {
Ok(mut routes) => {
if routes.len() != 1 || routes[0].endpoint.is_none() {
warn!(
Expand Down Expand Up @@ -420,11 +421,11 @@ mod tests {
#[async_trait]
impl Router for MockRouter {
async fn route(&self, req: RouteRequest) -> router::Result<Vec<Route>> {
let endpoint = self.routing_tables.get(&req.tables[0]);
let endpoint = self.routing_tables.get(&req.inner.tables[0]);
match endpoint {
None => Ok(vec![]),
Some(v) => Ok(vec![Route {
table: req.tables[0].clone(),
table: req.inner.tables[0].clone(),
endpoint: Some(v.clone().into()),
}]),
}
Expand Down
8 changes: 5 additions & 3 deletions proxy/src/grpc/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use ceresdbproto::storage::{RouteRequest, RouteResponse};
use ceresdbproto::storage::{RouteRequest as RouteRequestPb, RouteResponse};
use router::RouteRequest;

use crate::{error, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy};

impl Proxy {
pub async fn handle_route(&self, _ctx: Context, req: RouteRequest) -> RouteResponse {
let routes = self.route(req).await;
pub async fn handle_route(&self, _ctx: Context, req: RouteRequestPb) -> RouteResponse {
let request = RouteRequest::new(req, true);
let routes = self.route(request).await;

let mut resp = RouteResponse::default();
match routes {
Expand Down
13 changes: 9 additions & 4 deletions proxy/src/http/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use ceresdbproto::storage::RouteRequest;
use router::endpoint::Endpoint;
use ceresdbproto::storage::RouteRequest as RouteRequestPb;
use router::{endpoint::Endpoint, RouteRequest};
use serde::Serialize;

use crate::{context::RequestContext, error::Result, Proxy};
Expand All @@ -39,14 +39,19 @@ impl Proxy {
return Ok(RouteResponse { routes: vec![] });
}

let route_req = RouteRequest {
let req_pb = RouteRequestPb {
context: Some(ceresdbproto::storage::RequestContext {
database: ctx.schema.clone(),
}),
tables: vec![table.to_string()],
};

let routes = self.route(route_req).await?;
let request = RouteRequest {
route_with_cache: false,
inner: req_pb,
};

let routes = self.route(request).await?;

let routes = routes
.into_iter()
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use catalog::{
};
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, PrometheusRemoteQueryRequest,
PrometheusRemoteQueryResponse, Route, RouteRequest,
PrometheusRemoteQueryResponse, Route,
};
use common_types::{request_id::RequestId, table::DEFAULT_SHARD_ID, ENABLE_TTL, TTL};
use datafusion::{
Expand All @@ -70,7 +70,7 @@ use interpreters::{
};
use log::{error, info};
use query_frontend::plan::Plan;
use router::{endpoint::Endpoint, Router};
use router::{endpoint::Endpoint, RouteRequest, Router};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table_engine::{
Expand Down
20 changes: 10 additions & 10 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::{

use bytes::Bytes;
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, value, RouteRequest, Value, WriteRequest,
WriteResponse as WriteResponsePB, WriteSeriesEntry, WriteTableRequest,
storage_service_client::StorageServiceClient, value, RouteRequest as RouteRequestPb, Value,
WriteRequest, WriteResponse as WriteResponsePB, WriteSeriesEntry, WriteTableRequest,
};
use cluster::config::SchemaConfig;
use common_types::{
Expand All @@ -45,7 +45,7 @@ use query_frontend::{
planner::{build_column_schema, try_get_data_type_from_value},
provider::CatalogMetaProvider,
};
use router::endpoint::Endpoint;
use router::{endpoint::Endpoint, RouteRequest};
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::table::TableRef;
use tonic::transport::Channel;
Expand Down Expand Up @@ -310,13 +310,13 @@ impl Proxy {

// TODO: Make the router can accept an iterator over the tables to avoid the
// memory allocation here.
let route_data = self
.router
.route(RouteRequest {
context: req.context.clone(),
tables,
})
.await?;
let req_pb = RouteRequestPb {
context: req.context.clone(),
tables,
};
let request = RouteRequest::new(req_pb, true);
let route_data = self.router.route(request).await?;

let forwarded_table_routes = route_data
.into_iter()
.filter_map(|router| {
Expand Down
15 changes: 6 additions & 9 deletions remote_engine_client/src/cached_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{collections::HashMap, sync::RwLock};

use ceresdbproto::storage::{self, RequestContext};
use log::debug;
use router::{endpoint::Endpoint, RouterRef};
use router::{endpoint::Endpoint, RouteRequest, RouterRef};
use snafu::{OptionExt, ResultExt};
use table_engine::remote::model::TableIdentifier;
use tonic::transport::Channel as TonicChannel;
Expand Down Expand Up @@ -105,19 +105,16 @@ impl CachedRouter {
async fn do_route(&self, table_ident: &TableIdentifier) -> Result<RouteContext> {
let schema = &table_ident.schema;
let table = table_ident.table.clone();
let route_request = storage::RouteRequest {
let request_pb = storage::RouteRequest {
context: Some(RequestContext {
database: schema.to_string(),
}),
tables: vec![table],
};
let route_infos = self
.router
.route(route_request)
.await
.context(RouteWithCause {
table_ident: table_ident.clone(),
})?;
let request = RouteRequest::new(request_pb, true);
let route_infos = self.router.route(request).await.context(RouteWithCause {
table_ident: table_ident.clone(),
})?;

if route_infos.is_empty() {
return RouteNoCause {
Expand Down
79 changes: 52 additions & 27 deletions router/src/cluster_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! A router based on the [`cluster::Cluster`].

use async_trait::async_trait;
use ceresdbproto::storage::{Route, RouteRequest};
use ceresdbproto::storage::Route;
use cluster::ClusterRef;
use generic_error::BoxError;
use log::trace;
Expand All @@ -24,7 +24,8 @@ use moka::future::Cache;
use snafu::ResultExt;

use crate::{
endpoint::Endpoint, OtherWithCause, ParseEndpoint, Result, RouteCacheConfig, Router, TableInfo,
endpoint::Endpoint, OtherWithCause, ParseEndpoint, Result, RouteCacheConfig, RouteRequest,
Router, TableInfo,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -75,22 +76,15 @@ impl ClusterBasedRouter {
miss
}

async fn route_with_cache(
async fn route_from_meta(
&self,
tables: &Vec<String>,
tables: &[String],
database: String,
) -> Result<Vec<RouteData>> {
// Firstly route table from local cache.
let mut routes = Vec::with_capacity(tables.len());
let miss = self.route_from_cache(tables, &mut routes);
trace!("Route from cache, miss:{miss:?}, routes:{routes:?}");

if miss.is_empty() {
return Ok(routes);
}
routes: &mut Vec<RouteData>,
) -> Result<()> {
let route_tables_req = RouteTablesRequest {
schema_name: database,
table_names: miss,
table_names: tables.to_owned(),
};

let route_resp = self
Expand Down Expand Up @@ -126,6 +120,33 @@ impl ClusterBasedRouter {
routes.push(route);
}
}

Ok(())
}

async fn route_internal(
&self,
tables: &[String],
database: String,
route_with_cache: bool,
) -> Result<Vec<RouteData>> {
// Firstly route table from local cache.
let mut routes = Vec::with_capacity(tables.len());
let miss = if route_with_cache {
self.route_from_cache(tables, &mut routes)
} else {
tables.to_owned()
};

trace!("Route from cache, miss:{miss:?}, routes:{routes:?}");

if miss.is_empty() {
return Ok(routes);
}

// If miss exists, route from meta.
self.route_from_meta(&miss, database, &mut routes).await?;
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved

Ok(routes)
}
}
Expand All @@ -145,9 +166,12 @@ fn make_route(table_info: TableInfo, endpoint: Option<&str>) -> Result<RouteData
#[async_trait]
impl Router for ClusterBasedRouter {
async fn route(&self, req: RouteRequest) -> Result<Vec<Route>> {
let req_ctx = req.context.unwrap();
let route_data_vec = self.route_with_cache(&req.tables, req_ctx.database).await?;
Ok(route_data_vec
let req_ctx = req.inner.context.unwrap();
let route_datas = self
.route_internal(&req.inner.tables, req_ctx.database, req.route_with_cache)
.await?;

Ok(route_datas
.into_iter()
.map(|v| Route {
table: v.table_info.name,
Expand All @@ -158,7 +182,7 @@ impl Router for ClusterBasedRouter {

async fn fetch_table_info(&self, schema: &str, table: &str) -> Result<Option<TableInfo>> {
let mut route_data_vec = self
.route_with_cache(&vec![table.to_string()], schema.to_string())
.route_internal(&[table.to_string()], schema.to_string(), true)
.await?;
if route_data_vec.is_empty() {
return Ok(None);
Expand All @@ -174,7 +198,7 @@ impl Router for ClusterBasedRouter {
mod tests {
use std::{collections::HashMap, sync::Arc, thread::sleep, time::Duration};

use ceresdbproto::storage::RequestContext;
use ceresdbproto::storage::{RequestContext, RouteRequest as RouteRequestPb};
use cluster::{
shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster, ClusterNodesResp,
};
Expand Down Expand Up @@ -275,14 +299,15 @@ mod tests {

// first case get two tables, no one miss
let tables = vec![table1.to_string(), table2.to_string()];
let result = router
.route(RouteRequest {
context: Some(RequestContext {
database: String::from("public"),
}),
tables: tables.clone(),
})
.await;
let request_pb = RouteRequestPb {
context: Some(RequestContext {
database: String::from("public"),
}),
tables: tables.clone(),
};
let request = RouteRequest::new(request_pb, true);

let result = router.route(request).await;
assert_eq!(result.unwrap().len(), 2);

let mut routes = Vec::with_capacity(tables.len());
Expand Down
16 changes: 15 additions & 1 deletion router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod rule_based;
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use ceresdbproto::storage::{Route, RouteRequest};
use ceresdbproto::storage::{Route, RouteRequest as RouteRequestPb};
pub use cluster_based::ClusterBasedRouter;
use macros::define_result;
use meta_client::types::TableInfo;
Expand Down Expand Up @@ -79,6 +79,20 @@ pub trait Router {
async fn fetch_table_info(&self, schema: &str, table: &str) -> Result<Option<TableInfo>>;
}

pub struct RouteRequest {
pub route_with_cache: bool,
pub inner: RouteRequestPb,
}

impl RouteRequest {
pub fn new(request: RouteRequestPb, route_with_cache: bool) -> Self {
Self {
route_with_cache,
inner: request,
}
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RouteCacheConfig {
/// Enable route cache, default false.
Expand Down
12 changes: 7 additions & 5 deletions router/src/rule_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
use std::collections::HashMap;

use async_trait::async_trait;
use ceresdbproto::storage::{self, Route, RouteRequest};
use ceresdbproto::storage::{self, Route};
use cluster::config::SchemaConfig;
use log::info;
use meta_client::types::ShardId;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt};

use crate::{endpoint::Endpoint, hash, Result, RouteNotFound, Router, ShardNotFound, TableInfo};
use crate::{
endpoint::Endpoint, hash, Result, RouteNotFound, RouteRequest, Router, ShardNotFound, TableInfo,
};

pub type ShardNodes = HashMap<ShardId, Endpoint>;

Expand Down Expand Up @@ -151,7 +153,7 @@ impl RuleBasedRouter {
#[async_trait]
impl Router for RuleBasedRouter {
async fn route(&self, req: RouteRequest) -> Result<Vec<Route>> {
let req_ctx = req.context.unwrap();
let req_ctx = req.inner.context.unwrap();
let schema = &req_ctx.database;
if let Some(shard_nodes) = self.cluster_view.schema_shards.get(schema) {
ensure!(!shard_nodes.is_empty(), RouteNotFound { schema });
Expand All @@ -161,8 +163,8 @@ impl Router for RuleBasedRouter {

// TODO(yingwen): Better way to get total shard number
let total_shards = shard_nodes.len();
let mut route_results = Vec::with_capacity(req.tables.len());
for table in req.tables {
let mut route_results = Vec::with_capacity(req.inner.tables.len());
for table in req.inner.tables {
let shard_id = Self::route_table(&table, rule_list_opt, total_shards);

let endpoint = shard_nodes.get(&shard_id).with_context(|| ShardNotFound {
Expand Down
Loading