Skip to content

Commit

Permalink
chore: fix cr issue
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Apr 18, 2023
1 parent 1af6212 commit 7cfcde7
Showing 1 changed file with 22 additions and 35 deletions.
57 changes: 22 additions & 35 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_catalog::consts::{
use common_telemetry::{debug, error, info};
use dashmap::DashMap;
use futures::Stream;
use futures_util::StreamExt;
use futures_util::{StreamExt, TryStreamExt};
use key_lock::KeyLock;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
Expand All @@ -37,10 +37,10 @@ use table::requests::{CreateTableRequest, OpenTableRequest};
use table::TableRef;
use tokio::sync::Mutex;

use crate::error::Error::ParallelOpenTable;
use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, Result,
SchemaNotFoundSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu,
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu,
ParallelOpenTableSnafu, Result, SchemaNotFoundSnafu, TableEngineNotFoundSnafu,
TableExistsSnafu, UnimplementedSnafu,
};
use crate::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
Expand Down Expand Up @@ -257,41 +257,28 @@ impl RemoteCatalogManager {
info!("initializing tables in {}.{}", catalog_name, schema_name);
let mut table_num = 0;
let tables = self.iter_remote_tables(catalog_name, schema_name).await;
let kvs = tables.collect::<Vec<_>>().await;
let node_id = self.node_id;

let mut joins = kvs
let kvs = tables.try_collect::<Vec<_>>().await?;
let node_id = self.node_id;
let joins = kvs
.into_iter()
.map(|kv| {
let (table_key, table_value) = kv?;
.map(|(table_key, table_value)| {
let engine_manager = self.engine_manager.clone();
Ok(tokio::spawn(async move {
let table_ref =
open_or_create_table(node_id, engine_manager, &table_key, &table_value)
.await?;
Ok(table_ref)
}))
common_runtime::spawn_bg(async move {
open_or_create_table(node_id, engine_manager, &table_key, &table_value).await
})
})
.collect::<Result<Vec<_>>>()?;

while !joins.is_empty() {
match futures::future::select_all(joins).await {
(Ok(join_re), _, remaining) => {
joins = remaining;
let table_ref = join_re?;
let table_info = table_ref.table_info();

let table_name = &table_info.name;
let table_id = table_info.ident.table_id;
schema.register_table(table_name.clone(), table_ref)?;
info!("Registered table {}", table_name);
max_table_id = max_table_id.max(table_id);
table_num += 1;
}
(Err(source), _, _) => {
return Err(ParallelOpenTable { source });
}
}
.collect::<Vec<_>>();
let vec = futures::future::join_all(joins).await;
for res in vec {
let table_ref = res.context(ParallelOpenTableSnafu)??;
let table_info = table_ref.table_info();
let table_name = &table_info.name;
let table_id = table_info.ident.table_id;
schema.register_table(table_name.clone(), table_ref)?;
info!("Registered table {}", table_name);
max_table_id = max_table_id.max(table_id);
table_num += 1;
}

info!(
Expand Down

0 comments on commit 7cfcde7

Please sign in to comment.