Skip to content

Commit

Permalink
move table engine proxy to table_engine crate.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Mar 22, 2023
1 parent d8f652b commit 7103d3e
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 54 deletions.
3 changes: 2 additions & 1 deletion catalog_impls/src/table_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,9 +912,10 @@ mod tests {
schema::{CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, SchemaRef},
};
use common_types::table::{DEFAULT_CLUSTER_VERSION, DEFAULT_SHARD_ID};
use server::table_engine::{MemoryTableEngine, TableEngineProxy};
use table_engine::{
engine::{TableEngineRef, TableState},
memory::MemoryTableEngine,
proxy::TableEngineProxy,
ANALYTIC_ENGINE_TYPE,
};

Expand Down
2 changes: 2 additions & 0 deletions server/src/handlers/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ fn convert_influx_value(field_value: FieldValue) -> Value {
Value { value: Some(v) }
}

// fn convert_query_result(output: Output)

// TODO: Request and response type don't match influxdb's API now.
pub async fn query<Q: QueryExecutor + 'static>(
ctx: RequestContext,
Expand Down
1 change: 0 additions & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,3 @@ mod metrics;
mod mysql;
pub mod schema_config_provider;
pub mod server;
pub mod table_engine;
3 changes: 1 addition & 2 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ use server::{
cluster_based::ClusterBasedProvider, config_based::ConfigBasedProvider,
},
server::Builder,
table_engine::{MemoryTableEngine, TableEngineProxy},
};
use table_engine::engine::EngineRuntimes;
use table_engine::{engine::EngineRuntimes, memory::MemoryTableEngine, proxy::TableEngineProxy};
use tracing_util::{
self,
tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation},
Expand Down
1 change: 1 addition & 0 deletions table_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod memory;
pub mod partition;
pub mod predicate;
pub mod provider;
pub mod proxy;
pub mod remote;
pub mod stream;
pub mod table;
Expand Down
47 changes: 45 additions & 2 deletions table_engine/src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! In-memory table implementations
//! In-memory table engine implementations

use std::{
collections::HashMap,
Expand All @@ -23,14 +23,18 @@ use futures::stream::Stream;
use snafu::{OptionExt, ResultExt};

use crate::{
engine::{
CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TableEngine,
},
stream::{
self, ErrNoSource, ErrWithSource, PartitionedStreams, RecordBatchStream,
SendableRecordBatchStream,
},
table::{
AlterSchemaRequest, FlushRequest, GetRequest, ReadRequest, Result, Table, TableId,
TableStats, UnsupportedMethod, WriteRequest,
TableRef, TableStats, UnsupportedMethod, WriteRequest,
},
MEMORY_ENGINE_TYPE,
};

type RowGroupVec = Vec<RowGroup>;
Expand Down Expand Up @@ -250,3 +254,42 @@ fn build_column_block<'a, I: Iterator<Item = &'a Datum>>(
}
Ok(builder.build())
}

/// Memory table engine implementation
// Mainly for test purpose now
pub struct MemoryTableEngine;

#[async_trait]
impl TableEngine for MemoryTableEngine {
fn engine_type(&self) -> &str {
MEMORY_ENGINE_TYPE
}

async fn close(&self) -> crate::engine::Result<()> {
Ok(())
}

async fn create_table(&self, request: CreateTableRequest) -> crate::engine::Result<TableRef> {
Ok(Arc::new(MemoryTable::new(
request.table_name,
request.table_id,
request.table_schema,
MEMORY_ENGINE_TYPE.to_string(),
)))
}

async fn drop_table(&self, _request: DropTableRequest) -> crate::engine::Result<bool> {
Ok(true)
}

async fn open_table(
&self,
_request: OpenTableRequest,
) -> crate::engine::Result<Option<TableRef>> {
Ok(None)
}

async fn close_table(&self, _request: CloseTableRequest) -> crate::engine::Result<()> {
Ok(())
}
}
62 changes: 14 additions & 48 deletions server/src/table_engine.rs → table_engine/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,19 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! Table engine implementation

use std::sync::Arc;
//! Table engine proxy

use async_trait::async_trait;
use table_engine::{

use crate::{
engine::{
CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result,
TableEngine, TableEngineRef, UnknownEngineType,
CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TableEngine,
TableEngineRef, UnknownEngineType,
},
memory::MemoryTable,
memory::MemoryTableEngine,
table::TableRef,
ANALYTIC_ENGINE_TYPE, MEMORY_ENGINE_TYPE,
};

/// Memory table engine implementation
// Mainly for test purpose now
pub struct MemoryTableEngine;

#[async_trait]
impl TableEngine for MemoryTableEngine {
fn engine_type(&self) -> &str {
MEMORY_ENGINE_TYPE
}

async fn close(&self) -> Result<()> {
Ok(())
}

async fn create_table(&self, request: CreateTableRequest) -> Result<TableRef> {
Ok(Arc::new(MemoryTable::new(
request.table_name,
request.table_id,
request.table_schema,
MEMORY_ENGINE_TYPE.to_string(),
)))
}

async fn drop_table(&self, _request: DropTableRequest) -> Result<bool> {
Ok(true)
}

async fn open_table(&self, _request: OpenTableRequest) -> Result<Option<TableRef>> {
Ok(None)
}

async fn close_table(&self, _request: CloseTableRequest) -> Result<()> {
Ok(())
}
}

/// Route [CreateTableRequest] to the correct engine by its engine type
pub struct TableEngineProxy {
/// Memory table engine
Expand All @@ -65,14 +28,14 @@ impl TableEngine for TableEngineProxy {
"TableEngineProxy"
}

async fn close(&self) -> Result<()> {
async fn close(&self) -> crate::engine::Result<()> {
self.memory.close().await?;
self.analytic.close().await?;

Ok(())
}

async fn create_table(&self, request: CreateTableRequest) -> Result<TableRef> {
async fn create_table(&self, request: CreateTableRequest) -> crate::engine::Result<TableRef> {
// TODO(yingwen): Use a map
match request.engine.as_str() {
MEMORY_ENGINE_TYPE => self.memory.create_table(request).await,
Expand All @@ -81,7 +44,7 @@ impl TableEngine for TableEngineProxy {
}
}

async fn drop_table(&self, request: DropTableRequest) -> Result<bool> {
async fn drop_table(&self, request: DropTableRequest) -> crate::engine::Result<bool> {
match request.engine.as_str() {
MEMORY_ENGINE_TYPE => self.memory.drop_table(request).await,
ANALYTIC_ENGINE_TYPE => self.analytic.drop_table(request).await,
Expand All @@ -90,7 +53,10 @@ impl TableEngine for TableEngineProxy {
}

/// Open table, return error if table not exists
async fn open_table(&self, request: OpenTableRequest) -> Result<Option<TableRef>> {
async fn open_table(
&self,
request: OpenTableRequest,
) -> crate::engine::Result<Option<TableRef>> {
match request.engine.as_str() {
MEMORY_ENGINE_TYPE => self.memory.open_table(request).await,
ANALYTIC_ENGINE_TYPE => self.analytic.open_table(request).await,
Expand All @@ -99,7 +65,7 @@ impl TableEngine for TableEngineProxy {
}

/// Close table, it is ok to close a closed table.
async fn close_table(&self, request: CloseTableRequest) -> Result<()> {
async fn close_table(&self, request: CloseTableRequest) -> crate::engine::Result<()> {
match request.engine.as_str() {
MEMORY_ENGINE_TYPE => self.memory.close_table(request).await,
ANALYTIC_ENGINE_TYPE => self.analytic.close_table(request).await,
Expand Down

0 comments on commit 7103d3e

Please sign in to comment.