Skip to content

Commit

Permalink
test: refactor open table test and add close engine test
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 11, 2023
1 parent 962ede9 commit 5784553
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/file-table-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod immutable;
#[cfg(test)]
mod tests;

use table::metadata::TableVersion;

const INIT_TABLE_VERSION: TableVersion = 0;
32 changes: 30 additions & 2 deletions src/file-table-engine/src/engine/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ impl TableEngine for ImmutableFileTableEngine {
}
}

#[cfg(test)]
impl ImmutableFileTableEngine {
pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> {
self.inner.close_table(table_ref).await
}
}

impl ImmutableFileTableEngine {
pub fn new(config: EngineConfig, object_store: ObjectStore) -> Self {
ImmutableFileTableEngine {
Expand Down Expand Up @@ -344,15 +351,15 @@ impl EngineInner {
async fn close(&self) -> TableResult<()> {
let _lock = self.table_mutex.lock().await;

let mut tables = self.tables.write().unwrap().clone();
let tables = self.tables.read().unwrap().clone();

futures::future::try_join_all(tables.values().map(|t| t.close()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;

// Releases all closed table
tables.clear();
self.tables.write().unwrap().clear();

Ok(())
}
Expand All @@ -370,3 +377,24 @@ impl EngineInner {
.await
}
}

#[cfg(test)]
impl EngineInner {
pub async fn close_table(&self, table_ref: &TableReference<'_>) -> TableResult<()> {
let full_name = table_ref.to_string();

let _lock = self.table_mutex.lock().await;

if let Some(table) = self.get_table_by_full_name(&full_name) {
table
.close()
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}

self.tables.write().unwrap().remove(&full_name);

Ok(())
}
}
60 changes: 37 additions & 23 deletions src/file-table-engine/src/engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use table::engine::{EngineContext, TableEngine, TableReference};
use table::requests::{AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest};
use table::{error as table_error, Table};

use crate::config::EngineConfig;
use crate::engine::immutable::ImmutableFileTableEngine;
use crate::manifest::immutable::manifest_path;
use crate::table::immutable::ImmutableFileTable;
use crate::test_util::{self, TestEngineComponents, TEST_TABLE_NAME};
Expand Down Expand Up @@ -60,49 +58,65 @@ async fn test_open_table() {
table_id: 1,
};

let (table, object_store, _dir) = {
let TestEngineComponents {
table_engine,
table_ref: table,
object_store,
dir,
..
} = test_util::setup_test_engine_and_table("test_open_table").await;
let table_ref = TableReference {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_SCHEMA_NAME,
table: test_util::TEST_TABLE_NAME,
};

assert_eq!(IMMUTABLE_FILE_ENGINE, table_engine.name());
let TestEngineComponents {
table_engine,
table_ref: table,
dir: _dir,
..
} = test_util::setup_test_engine_and_table("test_open_table").await;

let reopened = table_engine
.open_table(&ctx, open_req.clone())
.await
.unwrap()
.unwrap();
assert_eq!(IMMUTABLE_FILE_ENGINE, table_engine.name());

assert_eq!(table.schema(), reopened.schema());
table_engine.close_table(&table_ref).await.unwrap();

(table, object_store, dir)
};

let table_engine = ImmutableFileTableEngine::new(EngineConfig::default(), object_store);
let reopened = table_engine
.open_table(&ctx, open_req.clone())
.await
.unwrap()
.unwrap();

assert_eq!(table.schema(), reopened.schema());

let reopened = reopened
.as_any()
.downcast_ref::<ImmutableFileTable>()
.unwrap();

let left = table.table_info();
let right = reopened.table_info();
assert_eq!(table.schema(), reopened.schema());

// assert recovered table_info is correct
assert_eq!(left, right);
}

#[tokio::test]
async fn test_close_all_table() {
common_telemetry::init_default_ut_logging();

let table_ref = TableReference {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_SCHEMA_NAME,
table: test_util::TEST_TABLE_NAME,
};

let TestEngineComponents {
table_engine,
dir: _dir,
..
} = test_util::setup_test_engine_and_table("test_close_all_table").await;

table_engine.close().await.unwrap();

let exist = table_engine.table_exists(&EngineContext::default(), &table_ref);

assert!(!exist);
}

#[tokio::test]
async fn test_alter_table() {
common_telemetry::init_default_ut_logging();
Expand Down

0 comments on commit 5784553

Please sign in to comment.