Skip to content

Commit

Permalink
feat: fetch region routes in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Feb 18, 2024
1 parent 72bac63 commit e0200d9
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 32 deletions.
7 changes: 2 additions & 5 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,8 @@ pub enum Error {
table: String,
},

#[snafu(display("Failed to find region routes: #{table}"))]
FindRegionRoutes {
source: partition::error::Error,
table: String,
},
#[snafu(display("Failed to find region routes"))]
FindRegionRoutes { source: partition::error::Error },

#[snafu(display("Failed to read system catalog table records"))]
ReadSystemCatalog {
Expand Down
44 changes: 27 additions & 17 deletions src/catalog/src/information_schema/region_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::pin::pin;
use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
Expand All @@ -28,7 +29,7 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt64VectorBuilder};
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
Expand Down Expand Up @@ -176,29 +177,38 @@ impl InformationSchemaRegionPeersBuilder {
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
let table_id_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.await
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Ok(None)
} else {
Ok(Some(table_info.ident.table_id))
}
});

while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
const BATCH_SIZE: usize = 128;

if table_info.table_type == TableType::Temporary {
continue;
}
// Split table ids into chunks
let mut table_id_chunks = pin!(table_id_stream.ready_chunks(BATCH_SIZE));

while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids.into_iter().collect::<Result<Vec<_>>>()?;

let table_id = table_info.ident.table_id;
let routes = if let Some(partition_manager) = &partition_manager {
let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_region_routes(table_id)
.find_region_routes_batch(&table_ids)
.await
.context(FindRegionRoutesSnafu {
table: &table_info.name,
})?
.context(FindRegionRoutesSnafu)?
} else {
// Standalone doesn't has route values.
vec![]
table_ids.into_iter().map(|id| (id, vec![])).collect()
};

self.add_region_peers(&predicates, &routes);
for routes in table_routes.values() {
self.add_region_peers(&predicates, routes);
}
}
}

Expand All @@ -211,7 +221,7 @@ impl InformationSchemaRegionPeersBuilder {
let peer_id = route.leader_peer.clone().map(|p| p.id);
let peer_addr = route.leader_peer.clone().map(|p| p.addr);
let status = if let Some(status) = route.leader_status {
Some(status.name().to_string())
Some(status.as_ref().to_string())
} else {
// Alive by default
Some("ALIVE".to_string())
Expand Down
13 changes: 3 additions & 10 deletions src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use snafu::OptionExt;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;

use crate::error::{self, Result};
use crate::key::RegionDistribution;
Expand Down Expand Up @@ -277,7 +278,8 @@ impl RegionRouteBuilder {

/// The Status of the [Region].
/// TODO(dennis): It's better to add more fine-grained statuses such as `PENDING` etc.
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq)]
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
#[strum(serialize_all = "UPPERCASE")]
pub enum RegionStatus {
/// The following cases in which the [Region] will be downgraded.
///
Expand All @@ -286,15 +288,6 @@ pub enum RegionStatus {
Downgraded,
}

impl RegionStatus {
/// Returns the status name.
pub fn name(&self) -> &str {
match self {
RegionStatus::Downgraded => "DOWNGRADED",
}
}
}

impl RegionRoute {
/// Returns true if the Leader [Region] is downgraded.
///
Expand Down
23 changes: 23 additions & 0 deletions src/partition/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,29 @@ impl PartitionRuleManager {
Ok(route.region_routes)
}

pub async fn find_region_routes_batch(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, Vec<RegionRoute>>> {
let table_routes = self
.table_route_manager
.batch_get(table_ids)
.await
.context(error::TableRouteManagerSnafu)?;

let mut table_region_routes = HashMap::with_capacity(table_routes.len());

for (table_id, table_route) in table_routes {
let region_routes = table_route
.region_routes()
.context(error::TableRouteManagerSnafu)?
.clone();
table_region_routes.insert(table_id, region_routes);
}

Ok(table_region_routes)
}

pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
let region_routes = self.find_region_routes(table_id).await?;
ensure!(
Expand Down

0 comments on commit e0200d9

Please sign in to comment.