Skip to content

Commit

Permalink
Merge branch 'develop' into chore/use-alicloud-imagehub
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r authored Apr 18, 2023
2 parents 8324b4e + 0e4d4f0 commit 604ae9d
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 207 deletions.
108 changes: 60 additions & 48 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ members = [
]

[workspace.package]
version = "0.1.1"
version = "0.2.0"
edition = "2021"
license = "Apache-2.0"

Expand Down
1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util.workspace = true
key-lock = "0.1"
lazy_static = "1.4"
meta-client = { path = "../meta-client" }
parking_lot = "0.12"
Expand Down
7 changes: 6 additions & 1 deletion src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_error::prelude::{Snafu, StatusCode};
use datafusion::error::DataFusionError;
use datatypes::prelude::ConcreteDataType;
use snafu::Location;
use tokio::task::JoinError;

use crate::DeregisterTableRequest;

Expand Down Expand Up @@ -127,6 +128,9 @@ pub enum Error {
source: table::error::Error,
},

#[snafu(display("Failed to open table in parallel, source: {}", source))]
ParallelOpenTable { source: JoinError },

#[snafu(display("Table not found while opening table, table info: {}", table_info))]
TableNotFound {
table_info: String,
Expand Down Expand Up @@ -261,7 +265,8 @@ impl ErrorExt for Error {
| Error::IllegalManagerState { .. }
| Error::CatalogNotFound { .. }
| Error::InvalidEntryType { .. }
| Error::InvalidSystemTableDef { .. } => StatusCode::Unexpected,
| Error::InvalidSystemTableDef { .. }
| Error::ParallelOpenTable { .. } => StatusCode::Unexpected,

Error::SystemCatalog { .. }
| Error::EmptyValue { .. }
Expand Down
220 changes: 119 additions & 101 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use common_catalog::consts::{
use common_telemetry::{debug, error, info};
use dashmap::DashMap;
use futures::Stream;
use futures_util::StreamExt;
use futures_util::{StreamExt, TryStreamExt};
use key_lock::KeyLock;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
Expand All @@ -37,8 +38,9 @@ use table::TableRef;
use tokio::sync::Mutex;

use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, Result,
SchemaNotFoundSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu,
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu,
ParallelOpenTableSnafu, Result, SchemaNotFoundSnafu, TableEngineNotFoundSnafu,
TableExistsSnafu, UnimplementedSnafu,
};
use crate::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
Expand Down Expand Up @@ -254,15 +256,31 @@ impl RemoteCatalogManager {
) -> Result<()> {
info!("initializing tables in {}.{}", catalog_name, schema_name);
let mut table_num = 0;
let mut tables = self.iter_remote_tables(catalog_name, schema_name).await;
while let Some(r) = tables.next().await {
let (table_key, table_value) = r?;
let table_ref = self.open_or_create_table(&table_key, &table_value).await?;
schema.register_table(table_key.table_name.to_string(), table_ref)?;
info!("Registered table {}", &table_key.table_name);
max_table_id = max_table_id.max(table_value.table_id());
let tables = self.iter_remote_tables(catalog_name, schema_name).await;

let kvs = tables.try_collect::<Vec<_>>().await?;
let node_id = self.node_id;
let joins = kvs
.into_iter()
.map(|(table_key, table_value)| {
let engine_manager = self.engine_manager.clone();
common_runtime::spawn_bg(async move {
open_or_create_table(node_id, engine_manager, &table_key, &table_value).await
})
})
.collect::<Vec<_>>();
let vec = futures::future::join_all(joins).await;
for res in vec {
let table_ref = res.context(ParallelOpenTableSnafu)??;
let table_info = table_ref.table_info();
let table_name = &table_info.name;
let table_id = table_info.ident.table_id;
schema.register_table(table_name.clone(), table_ref)?;
info!("Registered table {}", table_name);
max_table_id = max_table_id.max(table_id);
table_num += 1;
}

info!(
"initialized tables in {}.{}, total: {}",
catalog_name, schema_name, table_num
Expand Down Expand Up @@ -310,87 +328,88 @@ impl RemoteCatalogManager {
info!("Created catalog '{catalog_key}");
Ok(catalog_provider)
}
}

async fn open_or_create_table(
&self,
table_key: &TableGlobalKey,
table_value: &TableGlobalValue,
) -> Result<TableRef> {
let context = EngineContext {};
let TableGlobalKey {
catalog_name,
schema_name,
table_name,
..
} = table_key;

let table_id = table_value.table_id();

let TableGlobalValue {
table_info,
regions_id_map,
..
} = table_value;

// unwrap safety: checked in yielding this table when `iter_remote_tables`
let region_numbers = regions_id_map.get(&self.node_id).unwrap();

let request = OpenTableRequest {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
table_id,
};
let engine = self
.engine_manager
async fn open_or_create_table(
node_id: u64,
engine_manager: TableEngineManagerRef,
table_key: &TableGlobalKey,
table_value: &TableGlobalValue,
) -> Result<TableRef> {
let context = EngineContext {};
let TableGlobalKey {
catalog_name,
schema_name,
table_name,
..
} = table_key;

let table_id = table_value.table_id();

let TableGlobalValue {
table_info,
regions_id_map,
..
} = table_value;

// unwrap safety: checked in yielding this table when `iter_remote_tables`
let region_numbers = regions_id_map.get(&node_id).unwrap();

let request = OpenTableRequest {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
table_id,
};
let engine =
engine_manager
.engine(&table_info.meta.engine)
.context(TableEngineNotFoundSnafu {
engine_name: &table_info.meta.engine,
})?;
match engine
.open_table(&context, request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!("{catalog_name}.{schema_name}.{table_name}, id:{table_id}"),
})? {
Some(table) => {
info!(
"Table opened: {}.{}.{}",
catalog_name, schema_name, table_name
);
Ok(table)
}
None => {
info!(
"Try create table: {}.{}.{}",
catalog_name, schema_name, table_name
);
match engine
.open_table(&context, request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!("{catalog_name}.{schema_name}.{table_name}, id:{table_id}"),
})? {
Some(table) => {
info!(
"Table opened: {}.{}.{}",
catalog_name, schema_name, table_name
);
Ok(table)
}
None => {
info!(
"Try create table: {}.{}.{}",
catalog_name, schema_name, table_name
);

let meta = &table_info.meta;
let req = CreateTableRequest {
id: table_id,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
desc: None,
schema: meta.schema.clone(),
region_numbers: region_numbers.clone(),
primary_key_indices: meta.primary_key_indices.clone(),
create_if_not_exists: true,
table_options: meta.options.clone(),
engine: engine.name().to_string(),
};

engine
.create_table(&context, req)
.await
.context(CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id:{}",
&catalog_name, &schema_name, &table_name, table_id
),
})
}
let meta = &table_info.meta;
let req = CreateTableRequest {
id: table_id,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
desc: None,
schema: meta.schema.clone(),
region_numbers: region_numbers.clone(),
primary_key_indices: meta.primary_key_indices.clone(),
create_if_not_exists: true,
table_options: meta.options.clone(),
engine: engine.name().to_string(),
};

engine
.create_table(&context, req)
.await
.context(CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id:{}",
&catalog_name, &schema_name, &table_name, table_id
),
})
}
}
}
Expand Down Expand Up @@ -737,8 +756,8 @@ pub struct RemoteSchemaProvider {
schema_name: String,
node_id: u64,
backend: KvBackendRef,
tables: Arc<ArcSwap<HashMap<String, TableRef>>>,
mutex: Arc<Mutex<()>>,
tables: Arc<ArcSwap<DashMap<String, TableRef>>>,
mutex: Arc<KeyLock<String>>,
}

impl RemoteSchemaProvider {
Expand Down Expand Up @@ -775,11 +794,16 @@ impl SchemaProvider for RemoteSchemaProvider {
}

fn table_names(&self) -> Result<Vec<String>> {
Ok(self.tables.load().keys().cloned().collect::<Vec<_>>())
Ok(self
.tables
.load()
.iter()
.map(|en| en.key().clone())
.collect::<Vec<_>>())
}

async fn table(&self, name: &str) -> Result<Option<TableRef>> {
Ok(self.tables.load().get(name).cloned())
Ok(self.tables.load().get(name).map(|en| en.value().clone()))
}

fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
Expand All @@ -796,7 +820,7 @@ impl SchemaProvider for RemoteSchemaProvider {

let prev = std::thread::spawn(move || {
common_runtime::block_on_read(async move {
let _guard = mutex.lock().await;
let _guard = mutex.lock(table_key.clone()).await;
backend
.set(
table_key.as_bytes(),
Expand All @@ -808,11 +832,8 @@ impl SchemaProvider for RemoteSchemaProvider {
table_key, table_value
);

let prev_tables = tables.load();
let mut new_tables = HashMap::with_capacity(prev_tables.len() + 1);
new_tables.clone_from(&prev_tables);
let prev = new_tables.insert(name, table);
tables.store(Arc::new(new_tables));
let tables = tables.load();
let prev = tables.insert(name, table);
Ok(prev)
})
})
Expand All @@ -836,18 +857,15 @@ impl SchemaProvider for RemoteSchemaProvider {
let tables = self.tables.clone();
let prev = std::thread::spawn(move || {
common_runtime::block_on_read(async move {
let _guard = mutex.lock().await;
let _guard = mutex.lock(table_key.clone()).await;
backend.delete(table_key.as_bytes()).await?;
debug!(
"Successfully deleted catalog table entry, key: {}",
table_key
);

let prev_tables = tables.load();
let mut new_tables = HashMap::with_capacity(prev_tables.len() + 1);
new_tables.clone_from(&prev_tables);
let prev = new_tables.remove(&table_name);
tables.store(Arc::new(new_tables));
let tables = tables.load();
let prev = tables.remove(&table_name).map(|en| en.1);
Ok(prev)
})
})
Expand Down
2 changes: 2 additions & 0 deletions src/mito/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
dashmap = "5.4"
datafusion.workspace = true
datafusion-common.workspace = true
datatypes = { path = "../datatypes" }
futures.workspace = true
key-lock = "0.1"
log-store = { path = "../log-store" }
object-store = { path = "../object-store" }
serde = { version = "1.0", features = ["derive"] }
Expand Down
Loading

0 comments on commit 604ae9d

Please sign in to comment.