Skip to content

Commit

Permalink
chore: replace ceresdb with horaedb (#1338)
Browse files Browse the repository at this point in the history
## Rationale
related #1319

## Detailed Changes
replace ceresdb with horaedb in code

## Test Plan
CI

---------

Co-authored-by: tison <wander4096@gmail.com>
  • Loading branch information
jackwener and tisonkun authored Dec 4, 2023
1 parent 378a34a commit fa3ea0e
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 36 deletions.
2 changes: 1 addition & 1 deletion integration_tests/cases/common/dml/issue-1087.result
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ explain verbose select * from issue_1087;

plan_type,plan,
String("initial_logical_plan"),String("Projection: issue_1087.tsid, issue_1087.t, issue_1087.name, issue_1087.value\n TableScan: issue_1087"),
String("logical_plan after ceresdb_type_conversion"),String("SAME TEXT AS ABOVE"),
String("logical_plan after horaedb_type_conversion"),String("SAME TEXT AS ABOVE"),
String("logical_plan after inline_table_scan"),String("SAME TEXT AS ABOVE"),
String("logical_plan after type_coercion"),String("SAME TEXT AS ABOVE"),
String("logical_plan after count_wildcard_rule"),String("SAME TEXT AS ABOVE"),
Expand Down
14 changes: 7 additions & 7 deletions integration_tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ const CASE_ROOT_PATH_ENV: &str = "HORAEDB_TEST_CASE_PATH";
const ENV_FILTER_ENV: &str = "HORAEDB_ENV_FILTER";
const RUN_MODE: &str = "HORAEDB_INTEGRATION_TEST_BIN_RUN_MODE";

struct CeresDBController;
struct UntypedCeresDB {
struct HoraeDBController;
struct UntypedHoraeDB {
db: DbRef,
}

Expand All @@ -47,15 +47,15 @@ impl<T: Backend + Send + Sync> StoppableDatabase for HoraeDB<T> {
}

#[async_trait]
impl Database for UntypedCeresDB {
impl Database for UntypedHoraeDB {
async fn query(&self, context: QueryContext, query: String) -> Box<dyn Display> {
self.db.query(context, query).await
}
}

#[async_trait]
impl EnvController for CeresDBController {
type DB = UntypedCeresDB;
impl EnvController for HoraeDBController {
type DB = UntypedHoraeDB;

async fn start(&self, env: &str, _config: Option<&Path>) -> Self::DB {
println!("start with env {env}");
Expand All @@ -65,7 +65,7 @@ impl EnvController for CeresDBController {
_ => panic!("invalid env {env}"),
};

UntypedCeresDB { db }
UntypedHoraeDB { db }
}

async fn stop(&self, env: &str, mut database: Self::DB) {
Expand All @@ -76,7 +76,7 @@ impl EnvController for CeresDBController {

#[tokio::main]
async fn main() -> Result<()> {
let controller = CeresDBController;
let controller = HoraeDBController;
let run_mode = env::var(RUN_MODE).unwrap_or_else(|_| "sql_test".to_string());

match run_mode.as_str() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl AnalyzerRule for TypeConversion {
}

fn name(&self) -> &str {
"ceresdb_type_conversion"
"horaedb_type_conversion"
}
}

Expand Down
9 changes: 3 additions & 6 deletions query_engine/src/datafusion_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::{
prelude::{SessionConfig, SessionContext},
};
use df_engine_extensions::codec::PhysicalExtensionCodecImpl;
use table_engine::{provider::CeresdbOptions, remote::RemoteEngineRef};
use table_engine::{provider::HoraeDBOptions, remote::RemoteEngineRef};

use crate::{
context::Context,
Expand Down Expand Up @@ -139,7 +139,7 @@ impl DfContextBuilder {
let timeout = ctx
.deadline
.map(|deadline| deadline.duration_since(Instant::now()).as_millis() as u64);
let ceresdb_options = CeresdbOptions {
let options = HoraeDBOptions {
request_id: ctx.request_id.as_u64(),
request_timeout: timeout,
default_catalog: ctx.default_catalog.clone(),
Expand All @@ -152,10 +152,7 @@ impl DfContextBuilder {
)
.with_target_partitions(self.config.read_parallelism);

df_session_config
.options_mut()
.extensions
.insert(ceresdb_options);
df_session_config.options_mut().extensions.insert(options);

// Using default logcial optimizer, if want to add more custom rule, using
// `add_optimizer_rule` to add.
Expand Down
18 changes: 9 additions & 9 deletions query_engine/src/datafusion_impl/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use generic_error::BoxError;
use prost::Message;
use snafu::ResultExt;
use table_engine::{
provider::{CeresdbOptions, ScanTable},
provider::{HoraeDBOptions, ScanTable},
remote::{
model::{
ExecContext, ExecutePlanRequest, PhysicalPlan, RemoteExecuteRequest, TableIdentifier,
Expand Down Expand Up @@ -177,19 +177,19 @@ impl RemotePhysicalPlanExecutor for RemotePhysicalPlanExecutorImpl {
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>> {
// Get the custom context to rebuild execution context.
let ceresdb_options = task_context
let options = task_context
.session_config()
.options()
.extensions
.get::<CeresdbOptions>();
assert!(ceresdb_options.is_some());
let ceresdb_options = ceresdb_options.unwrap();
let request_id = RequestId::from(ceresdb_options.request_id);
let deadline = ceresdb_options
.get::<HoraeDBOptions>();
assert!(options.is_some());
let options = options.unwrap();
let request_id = RequestId::from(options.request_id);
let deadline = options
.request_timeout
.map(|n| Instant::now() + Duration::from_millis(n));
let default_catalog = ceresdb_options.default_catalog.clone();
let default_schema = ceresdb_options.default_schema.clone();
let default_catalog = options.default_catalog.clone();
let default_schema = options.default_schema.clone();

let display_plan = DisplayableExecutionPlan::new(plan.as_ref());
let exec_ctx = ExecContext {
Expand Down
8 changes: 4 additions & 4 deletions query_frontend/src/influxql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ impl<'a, P: MetaProvider> SchemaProvider for InfluxQLSchemaProvider<'a, P> {
}
};

let ceresdb_arrow_schema = table_source.schema();
let horaedb_arrow_schema = table_source.schema();
let influxql_schema = match convert_to_influxql_schema(table_source.schema()) {
Ok(schema) => schema,
Err(e) => {
// Same as above here.
error!("Influxql planner failed to convert schema to influxql schema, schema:{ceresdb_arrow_schema}, err:{e}");
error!("Influxql planner failed to convert schema to influxql schema, schema:{horaedb_arrow_schema}, err:{e}");
return None;
}
};
Expand Down Expand Up @@ -133,8 +133,8 @@ pub(crate) struct Planner<'a, P: MetaProvider> {
schema_provider: InfluxQLSchemaProvider<'a, P>,
}

fn convert_to_influxql_schema(ceresdb_arrow_schema: ArrowSchemaRef) -> Result<Schema> {
ceresdb_schema_to_influxdb(ceresdb_arrow_schema)
fn convert_to_influxql_schema(horaedb_arrow_schema: ArrowSchemaRef) -> Result<Schema> {
ceresdb_schema_to_influxdb(horaedb_arrow_schema)
.box_err()
.and_then(|s| Schema::try_from(s).box_err())
.context(BuildPlanWithCause {
Expand Down
16 changes: 8 additions & 8 deletions table_engine/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ use crate::{
const SCAN_TABLE_METRICS_COLLECTOR_NAME: &str = "scan_table";

#[derive(Clone, Debug)]
pub struct CeresdbOptions {
pub struct HoraeDBOptions {
pub request_id: u64,
pub request_timeout: Option<u64>,
pub default_schema: String,
pub default_catalog: String,
}

impl ConfigExtension for CeresdbOptions {
impl ConfigExtension for HoraeDBOptions {
const PREFIX: &'static str = "horaedb";
}

impl ExtensionOptions for CeresdbOptions {
impl ExtensionOptions for HoraeDBOptions {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -179,11 +179,11 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let ceresdb_options = state.config_options().extensions.get::<CeresdbOptions>();
assert!(ceresdb_options.is_some());
let ceresdb_options = ceresdb_options.unwrap();
let request_id = RequestId::from(ceresdb_options.request_id);
let deadline = ceresdb_options
let options = state.config_options().extensions.get::<HoraeDBOptions>();
assert!(options.is_some());
let options = options.unwrap();
let request_id = RequestId::from(options.request_id);
let deadline = options
.request_timeout
.map(|n| Instant::now() + Duration::from_millis(n));
let read_parallelism = state.config().target_partitions();
Expand Down

1 comment on commit fa3ea0e

@tematou
Copy link

@tematou tematou commented on fa3ea0e Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change ceresdb to horaedb?

Please sign in to comment.