Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: drop Table trait #3654

Merged
merged 6 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
use table::metadata::{
FilterPushDownType, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType,
};
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::TableRef;
use table::{Table, TableRef};
pub use table_names::*;

use self::columns::InformationSchemaColumns;
Expand Down Expand Up @@ -187,10 +186,9 @@ impl InformationSchemaProvider {
self.information_table(name).map(|table| {
let table_info = Self::table_info(self.catalog_name.clone(), &table);
let filter_pushdown = FilterPushDownType::Inexact;
let thin_table = ThinTable::new(table_info, filter_pushdown);

let data_source = Arc::new(InformationTableDataSource::new(table));
Arc::new(ThinTableAdapter::new(thin_table, data_source)) as _
let table = Table::new(table_info, filter_pushdown, data_source);
Arc::new(table)
})
}

Expand Down
12 changes: 4 additions & 8 deletions src/query/src/tests/time_range_filter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use store_api::storage::ScanRequest;
use table::metadata::FilterPushDownType;
use table::predicate::TimeRangePredicateBuilder;
use table::test_util::MemTable;
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::TableRef;
use table::{Table, TableRef};

use crate::tests::exec_selection;
use crate::{QueryEngineFactory, QueryEngineRef};
Expand All @@ -42,16 +41,13 @@ struct MemTableWrapper;
impl MemTableWrapper {
pub fn table(table: TableRef, filter: Arc<RwLock<Vec<Expr>>>) -> TableRef {
let table_info = table.table_info();
let thin_table_adapter = table.as_any().downcast_ref::<ThinTableAdapter>().unwrap();
let data_source = thin_table_adapter.data_source();

let thin_table = ThinTable::new(table_info, FilterPushDownType::Exact);
let data_source = table.data_source();
let data_source = Arc::new(DataSourceWrapper {
inner: data_source,
filter,
});

Arc::new(ThinTableAdapter::new(thin_table, data_source))
let table = Table::new(table_info, FilterPushDownType::Exact, data_source);
Arc::new(table)
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/table/src/dist_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ use store_api::storage::ScanRequest;

use crate::error::UnsupportedSnafu;
use crate::metadata::{FilterPushDownType, TableInfoRef};
use crate::thin_table::{ThinTable, ThinTableAdapter};
use crate::TableRef;
use crate::{Table, TableRef};

#[derive(Clone)]
pub struct DistTable;

impl DistTable {
pub fn table(table_info: TableInfoRef) -> TableRef {
let thin_table = ThinTable::new(table_info, FilterPushDownType::Inexact);
let data_source = Arc::new(DummyDataSource);
Arc::new(ThinTableAdapter::new(thin_table, data_source))
let table = Table::new(table_info, FilterPushDownType::Inexact, data_source);
Arc::new(table)
}
}

Expand Down
1 change: 0 additions & 1 deletion src/table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub mod stats;
pub mod table;
pub mod table_reference;
pub mod test_util;
pub mod thin_table;

pub use crate::error::{Error, Result};
pub use crate::stats::{ColumnStatistics, TableStatistics};
Expand Down
87 changes: 57 additions & 30 deletions src/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,80 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod adapter;
mod metrics;
pub mod numbers;
pub mod scan;

use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use common_query::logical_plan::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use store_api::data_source::DataSourceRef;
use store_api::storage::ScanRequest;

use crate::error::Result;
use crate::error::{Result, TablesRecordBatchSnafu};
use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType};

/// Table abstraction.
#[async_trait]
pub trait Table: Send + Sync {
/// Returns the table as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
pub mod adapter;
mod metrics;
pub mod numbers;
pub mod scan;

pub type TableRef = Arc<Table>;

/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef;
#[async_trait::async_trait]
pub trait TableIdProvider {
async fn next_table_id(&self) -> Result<TableId>;
}

pub type TableIdProviderRef = Arc<dyn TableIdProvider + Send + Sync>;

/// Table handle.
pub struct Table {
table_info: TableInfoRef,
filter_pushdown: FilterPushDownType,
data_source: DataSourceRef,
}

impl Table {
pub fn new(
table_info: TableInfoRef,
filter_pushdown: FilterPushDownType,
data_source: DataSourceRef,
) -> Self {
Self {
table_info,
filter_pushdown,
data_source,
}
}

pub fn data_source(&self) -> DataSourceRef {
self.data_source.clone()
}

/// Get a reference to the schema for this table.
pub fn schema(&self) -> SchemaRef {
self.table_info.meta.schema.clone()
}

/// Get a reference to the table info.
fn table_info(&self) -> TableInfoRef;
pub fn table_info(&self) -> TableInfoRef {
self.table_info.clone()
}

/// Get the type of this table for metadata/catalog purposes.
fn table_type(&self) -> TableType;
pub fn table_type(&self) -> TableType {
self.table_info.table_type
}

async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
pub async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
self.data_source
.get_stream(request)
.context(TablesRecordBatchSnafu)
}

/// Tests whether the table provider can make use of any or all filter expressions
/// to optimise data retrieval.
fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
Ok(vec![FilterPushDownType::Unsupported; filters.len()])
pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
Ok(vec![self.filter_pushdown; filters.len()])
}
}

pub type TableRef = Arc<dyn Table>;

#[async_trait::async_trait]
pub trait TableIdProvider {
async fn next_table_id(&self) -> Result<TableId>;
}

pub type TableIdProviderRef = Arc<dyn TableIdProvider + Send + Sync>;
10 changes: 5 additions & 5 deletions src/table/src/table/numbers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ use store_api::storage::ScanRequest;
use crate::metadata::{
FilterPushDownType, TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType,
};
use crate::thin_table::{ThinTable, ThinTableAdapter};
use crate::TableRef;
use crate::{Table, TableRef};

const NUMBER_COLUMN: &str = "number";

Expand All @@ -49,12 +48,13 @@ impl NumbersTable {
}

pub fn table_with_name(table_id: TableId, name: String) -> TableRef {
let thin_table = ThinTable::new(
let data_source = Arc::new(NumbersDataSource::new(Self::schema()));
let table = Table::new(
Self::table_info(table_id, name, "test_engine".to_string()),
FilterPushDownType::Unsupported,
data_source,
);
let data_source = Arc::new(NumbersDataSource::new(Self::schema()));
Arc::new(ThinTableAdapter::new(thin_table, data_source))
Arc::new(table)
}

pub fn schema() -> SchemaRef {
Expand Down
11 changes: 7 additions & 4 deletions src/table/src/test_util/empty_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ use store_api::data_source::DataSource;
use store_api::storage::ScanRequest;

use crate::metadata::{FilterPushDownType, TableInfo};
use crate::thin_table::{ThinTable, ThinTableAdapter};
use crate::TableRef;
use crate::{Table, TableRef};

pub struct EmptyTable;

impl EmptyTable {
pub fn from_table_info(info: &TableInfo) -> TableRef {
let thin_table = ThinTable::new(Arc::new(info.clone()), FilterPushDownType::Unsupported);
let data_source = Arc::new(EmptyDataSource {
schema: info.meta.schema.clone(),
});
Arc::new(ThinTableAdapter::new(thin_table, data_source))
let table = Table::new(
Arc::new(info.clone()),
FilterPushDownType::Unsupported,
data_source,
);
Arc::new(table)
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/table/src/test_util/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use crate::error::{SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatc
use crate::metadata::{
FilterPushDownType, TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion,
};
use crate::thin_table::{ThinTable, ThinTableAdapter};
use crate::TableRef;
use crate::{Table, TableRef};

pub struct MemTable;

Expand Down Expand Up @@ -94,9 +93,9 @@ impl MemTable {
.unwrap(),
);

let thin_table = ThinTable::new(info, FilterPushDownType::Unsupported);
let data_source = Arc::new(MemtableDataSource { recordbatch });
Arc::new(ThinTableAdapter::new(thin_table, data_source))
let table = Table::new(info, FilterPushDownType::Unsupported, data_source);
Arc::new(table)
}

/// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and
Expand Down
87 changes: 0 additions & 87 deletions src/table/src/thin_table.rs

This file was deleted.