Skip to content

Commit

Permalink
refactor: drop Table trait
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Apr 7, 2024
1 parent 097a037 commit c34f9eb
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 94 deletions.
7 changes: 3 additions & 4 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
use table::metadata::{
FilterPushDownType, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType,
};
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::thin_table::ThinTable;
use table::TableRef;
pub use table_names::*;

Expand Down Expand Up @@ -187,10 +187,9 @@ impl InformationSchemaProvider {
self.information_table(name).map(|table| {
let table_info = Self::table_info(self.catalog_name.clone(), &table);
let filter_pushdown = FilterPushDownType::Inexact;
let thin_table = ThinTable::new(table_info, filter_pushdown);

let data_source = Arc::new(InformationTableDataSource::new(table));
Arc::new(ThinTableAdapter::new(thin_table, data_source)) as _
let thin_table = ThinTable::new(table_info, filter_pushdown, data_source);
Arc::new(thin_table)
})
}

Expand Down
11 changes: 4 additions & 7 deletions src/query/src/tests/time_range_filter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use store_api::storage::ScanRequest;
use table::metadata::FilterPushDownType;
use table::predicate::TimeRangePredicateBuilder;
use table::test_util::MemTable;
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::thin_table::ThinTable;
use table::TableRef;

use crate::tests::exec_selection;
Expand All @@ -42,16 +42,13 @@ struct MemTableWrapper;
impl MemTableWrapper {
pub fn table(table: TableRef, filter: Arc<RwLock<Vec<Expr>>>) -> TableRef {
let table_info = table.table_info();
let thin_table_adapter = table.as_any().downcast_ref::<ThinTableAdapter>().unwrap();
let data_source = thin_table_adapter.data_source();

let thin_table = ThinTable::new(table_info, FilterPushDownType::Exact);
let data_source = table.data_source();
let data_source = Arc::new(DataSourceWrapper {
inner: data_source,
filter,
});

Arc::new(ThinTableAdapter::new(thin_table, data_source))
let thin_table = ThinTable::new(table_info, FilterPushDownType::Exact, data_source);
Arc::new(thin_table)
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/table/src/dist_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ use store_api::storage::ScanRequest;

use crate::error::UnsupportedSnafu;
use crate::metadata::{FilterPushDownType, TableInfoRef};
use crate::thin_table::{ThinTable, ThinTableAdapter};
use crate::thin_table::ThinTable;
use crate::TableRef;

#[derive(Clone)]
pub struct DistTable;

impl DistTable {
pub fn table(table_info: TableInfoRef) -> TableRef {
let thin_table = ThinTable::new(table_info, FilterPushDownType::Inexact);
let data_source = Arc::new(DummyDataSource);
Arc::new(ThinTableAdapter::new(thin_table, data_source))
let thin_table = ThinTable::new(table_info, FilterPushDownType::Inexact, data_source);
Arc::new(thin_table)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ pub mod thin_table;

pub use crate::error::{Error, Result};
pub use crate::stats::{ColumnStatistics, TableStatistics};
pub use crate::table::{Table, TableRef};
pub use crate::table::TableRef;
pub use crate::thin_table::ThinTable;
45 changes: 7 additions & 38 deletions src/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,49 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod adapter;
mod metrics;
pub mod numbers;
pub mod scan;

use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use common_query::logical_plan::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use store_api::storage::ScanRequest;

use crate::error::Result;
use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType};

/// Table abstraction.
#[async_trait]
pub trait Table: Send + Sync {
/// Returns the table as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef;
use crate::metadata::{TableId, TableType};
use crate::thin_table::ThinTable;

/// Get a reference to the table info.
fn table_info(&self) -> TableInfoRef;

/// Get the type of this table for metadata/catalog purposes.
fn table_type(&self) -> TableType;

async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;

/// Tests whether the table provider can make use of any or all filter expressions
/// to optimise data retrieval.
fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
Ok(vec![FilterPushDownType::Unsupported; filters.len()])
}
}
pub mod adapter;
mod metrics;
pub mod numbers;
pub mod scan;

pub type TableRef = Arc<dyn Table>;
pub type TableRef = Arc<ThinTable>;

#[async_trait::async_trait]
pub trait TableIdProvider {
Expand Down
7 changes: 4 additions & 3 deletions src/table/src/table/numbers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use store_api::storage::ScanRequest;
use crate::metadata::{
FilterPushDownType, TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType,
};
use crate::thin_table::{ThinTable, ThinTableAdapter};
use crate::thin_table::ThinTable;
use crate::TableRef;

const NUMBER_COLUMN: &str = "number";
Expand All @@ -49,12 +49,13 @@ impl NumbersTable {
}

pub fn table_with_name(table_id: TableId, name: String) -> TableRef {
let data_source = Arc::new(NumbersDataSource::new(Self::schema()));
let thin_table = ThinTable::new(
Self::table_info(table_id, name, "test_engine".to_string()),
FilterPushDownType::Unsupported,
data_source,
);
let data_source = Arc::new(NumbersDataSource::new(Self::schema()));
Arc::new(ThinTableAdapter::new(thin_table, data_source))
Arc::new(thin_table)
}

pub fn schema() -> SchemaRef {
Expand Down
10 changes: 7 additions & 3 deletions src/table/src/test_util/empty_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ use store_api::data_source::DataSource;
use store_api::storage::ScanRequest;

use crate::metadata::{FilterPushDownType, TableInfo};
use crate::thin_table::{ThinTable, ThinTableAdapter};
use crate::thin_table::ThinTable;
use crate::TableRef;

pub struct EmptyTable;

impl EmptyTable {
pub fn from_table_info(info: &TableInfo) -> TableRef {
let thin_table = ThinTable::new(Arc::new(info.clone()), FilterPushDownType::Unsupported);
let data_source = Arc::new(EmptyDataSource {
schema: info.meta.schema.clone(),
});
Arc::new(ThinTableAdapter::new(thin_table, data_source))
let thin_table = ThinTable::new(
Arc::new(info.clone()),
FilterPushDownType::Unsupported,
data_source,
);
Arc::new(thin_table)
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/table/src/test_util/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::error::{SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatc
use crate::metadata::{
FilterPushDownType, TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion,
};
use crate::thin_table::{ThinTable, ThinTableAdapter};
use crate::thin_table::ThinTable;
use crate::TableRef;

pub struct MemTable;
Expand Down Expand Up @@ -94,9 +94,9 @@ impl MemTable {
.unwrap(),
);

let thin_table = ThinTable::new(info, FilterPushDownType::Unsupported);
let data_source = Arc::new(MemtableDataSource { recordbatch });
Arc::new(ThinTableAdapter::new(thin_table, data_source))
let thin_table = ThinTable::new(info, FilterPushDownType::Unsupported, data_source);
Arc::new(thin_table)
}

/// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and
Expand Down
48 changes: 16 additions & 32 deletions src/table/src/thin_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use async_trait::async_trait;
use common_query::prelude::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
Expand All @@ -24,64 +21,51 @@ use store_api::storage::ScanRequest;

use crate::error::{Result, TablesRecordBatchSnafu};
use crate::metadata::{FilterPushDownType, TableInfoRef, TableType};
use crate::Table;

/// The `ThinTable` struct will replace the `Table` trait.
/// TODO(zhongzc): After completion, perform renaming and documentation work.
pub struct ThinTable {
table_info: TableInfoRef,
filter_pushdown: FilterPushDownType,
data_source: DataSourceRef,
}

impl ThinTable {
pub fn new(table_info: TableInfoRef, filter_pushdown: FilterPushDownType) -> Self {
pub fn new(
table_info: TableInfoRef,
filter_pushdown: FilterPushDownType,
data_source: DataSourceRef,
) -> Self {
Self {
table_info,
filter_pushdown,
data_source,
}
}
}

pub struct ThinTableAdapter {
table: ThinTable,
data_source: DataSourceRef,
}

impl ThinTableAdapter {
pub fn new(table: ThinTable, data_source: DataSourceRef) -> Self {
Self { table, data_source }
}

pub fn data_source(&self) -> DataSourceRef {
self.data_source.clone()
}
}

#[async_trait]
impl Table for ThinTableAdapter {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.table.table_info.meta.schema.clone()
pub fn schema(&self) -> SchemaRef {
self.table_info.meta.schema.clone()
}

fn table_info(&self) -> TableInfoRef {
self.table.table_info.clone()
pub fn table_info(&self) -> TableInfoRef {
self.table_info.clone()
}

fn table_type(&self) -> TableType {
self.table.table_info.table_type
pub fn table_type(&self) -> TableType {
self.table_info.table_type
}

async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
pub async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
self.data_source
.get_stream(request)
.context(TablesRecordBatchSnafu)
}

fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
Ok(vec![self.table.filter_pushdown; filters.len()])
pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
Ok(vec![self.filter_pushdown; filters.len()])
}
}

0 comments on commit c34f9eb

Please sign in to comment.