diff --git a/catalog/src/lib.rs b/catalog/src/lib.rs index 000e2a9225..81133e2573 100644 --- a/catalog/src/lib.rs +++ b/catalog/src/lib.rs @@ -53,8 +53,10 @@ pub enum Error { #[snafu(display("Failed to operate table, msg:{}, err:{}", msg, source))] TableOperatorWithCause { msg: String, source: GenericError }, - #[snafu(display("Failed to operate table, msg:{}.\nBacktrace:\n{}", msg, backtrace))] - TableOperatorNoCause { msg: String, backtrace: Backtrace }, + // Fixme: Temporarily remove the stack information, otherwise you will encounter a + // segmentation fault. + #[snafu(display("Failed to operate table, msg:{}.\n", msg))] + TableOperatorNoCause { msg: String }, } define_result!(Error); diff --git a/catalog/src/table_operator.rs b/catalog/src/table_operator.rs index c621c9c8cd..35a6058c43 100644 --- a/catalog/src/table_operator.rs +++ b/catalog/src/table_operator.rs @@ -101,7 +101,7 @@ impl TableOperator { } else { TableOperatorNoCause { msg: format!( - "Failed to open shard, some tables open failed, no table is shard id:{shard_id}, opened count:{no_table_count}, open error count:{open_err_count}"), + "Failed to open shard, some tables open failed, shard id:{shard_id}, no table is opened count:{no_table_count}, open error count:{open_err_count}"), }.fail() } } diff --git a/proxy/src/handlers/error.rs b/proxy/src/handlers/error.rs index 67ce26095c..7ec6ffb51a 100644 --- a/proxy/src/handlers/error.rs +++ b/proxy/src/handlers/error.rs @@ -2,7 +2,7 @@ //! Error of handlers -use common_util::{define_result, error::GenericError}; +use common_util::define_result; use snafu::{Backtrace, Snafu}; use warp::reject::Reject; @@ -18,11 +18,6 @@ pub enum Error { source: query_frontend::frontend::Error, }, - #[snafu(display("Failed to parse influxql, err:{}", source))] - ParseInfluxql { - source: query_frontend::frontend::Error, - }, - #[snafu(display("Failed to create plan, query:{}, err:{}", query, source))] CreatePlan { query: String, @@ -77,12 +72,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))] - InfluxDbHandlerWithCause { msg: String, source: GenericError }, - - #[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))] - InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace }, - #[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))] RouteHandler { table: String, diff --git a/proxy/src/handlers/mod.rs b/proxy/src/handlers/mod.rs index 6c59404de0..da1c6e9996 100644 --- a/proxy/src/handlers/mod.rs +++ b/proxy/src/handlers/mod.rs @@ -4,7 +4,6 @@ pub mod admin; pub(crate) mod error; -pub mod influxdb; pub mod query; pub mod route; diff --git a/proxy/src/handlers/query.rs b/proxy/src/handlers/query.rs index 7e3abed572..b3140621f8 100644 --- a/proxy/src/handlers/query.rs +++ b/proxy/src/handlers/query.rs @@ -23,10 +23,7 @@ use serde::{ use snafu::{ensure, ResultExt}; use crate::handlers::{ - error::{ - CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt, - }, - influxdb::InfluxqlRequest, + error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt}, prelude::*, }; @@ -101,14 +98,11 @@ impl From for Request { #[derive(Debug)] pub enum QueryRequest { Sql(Request), - // TODO: influxql include more parameters, we should add it in later. - Influxql(InfluxqlRequest), } impl QueryRequest { pub fn query(&self) -> &str { match self { QueryRequest::Sql(request) => request.query.as_str(), - QueryRequest::Influxql(request) => request.query.as_str(), } } } @@ -168,30 +162,6 @@ pub async fn handle_query( query: &request.query, })? } - - QueryRequest::Influxql(request) => { - let mut stmts = frontend - .parse_influxql(&mut sql_ctx, &request.query) - .context(ParseInfluxql)?; - - if stmts.is_empty() { - return Ok(Output::AffectedRows(0)); - } - - ensure!( - stmts.len() == 1, - TooMuchStmt { - len: stmts.len(), - query: &request.query, - } - ); - - frontend - .influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0)) - .context(CreatePlan { - query: &request.query, - })? - } }; instance.limiter.try_limit(&plan).context(QueryBlock { diff --git a/proxy/src/http/mod.rs b/proxy/src/http/mod.rs index ec02939c33..631330d49a 100644 --- a/proxy/src/http/mod.rs +++ b/proxy/src/http/mod.rs @@ -1,4 +1,4 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. pub mod prom; -pub mod query; +pub mod sql; diff --git a/proxy/src/http/query.rs b/proxy/src/http/sql.rs similarity index 64% rename from proxy/src/http/query.rs rename to proxy/src/http/sql.rs index bc93be6963..3e562c0c7b 100644 --- a/proxy/src/http/query.rs +++ b/proxy/src/http/sql.rs @@ -33,15 +33,14 @@ use crate::{ error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result}, execute_plan, forward::ForwardResult, - handlers::influxdb::InfluxqlRequest, Proxy, }; impl Proxy { - pub async fn handle_query( + pub async fn handle_http_sql_query( &self, ctx: &RequestContext, - query_request: QueryRequest, + req: Request, ) -> Result { let request_id = RequestId::next_id(); let begin_instant = Instant::now(); @@ -49,7 +48,7 @@ impl Proxy { info!( "Query handler try to process request, request_id:{}, request:{:?}", - request_id, query_request + request_id, req ); // TODO(yingwen): Privilege check, cannot access data of other tenant @@ -63,110 +62,66 @@ impl Proxy { let frontend = Frontend::new(provider); let mut sql_ctx = SqlContext::new(request_id, deadline); - let plan = match &query_request { - QueryRequest::Sql(request) => { - // Parse sql, frontend error of invalid sql already contains sql - // TODO(yingwen): Maybe move sql from frontend error to outer error - let mut stmts = frontend - .parse_sql(&mut sql_ctx, &request.query) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Failed to parse sql, query:{}", request.query), - })?; - - if stmts.is_empty() { - return Ok(Output::AffectedRows(0)); - } - - // TODO(yingwen): For simplicity, we only support executing one statement now - // TODO(yingwen): INSERT/UPDATE/DELETE can be batched - ensure!( - stmts.len() == 1, - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!( - "Only support execute one statement now, current num:{}, query:{}.", - stmts.len(), - request.query - ), - } - ); - - let sql_query_request = SqlQueryRequest { - context: Some(GrpcRequestContext { - database: ctx.schema.clone(), - }), - tables: vec![], - sql: request.query.clone(), - }; - - if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? { - match resp { - ForwardResult::Forwarded(resp) => { - return convert_sql_response_to_output(resp?) - } - ForwardResult::Local => (), - } - }; - - // Open partition table if needed. - let table_name = frontend::parse_table_name(&stmts); - if let Some(table_name) = table_name { - self.maybe_open_partition_table_if_not_exist( - &ctx.catalog, - &ctx.schema, - &table_name, - ) - .await?; - } - - // Create logical plan - // Note: Remember to store sql in error when creating logical plan - frontend - .statement_to_plan(&mut sql_ctx, stmts.remove(0)) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Failed to build plan, query:{}", request.query), - })? + // Parse sql, frontend error of invalid sql already contains sql + // TODO(yingwen): Maybe move sql from frontend error to outer error + let mut stmts = frontend + .parse_sql(&mut sql_ctx, &req.query) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Failed to parse sql, query:{}", req.query), + })?; + + if stmts.is_empty() { + return Ok(Output::AffectedRows(0)); + } + + // TODO(yingwen): For simplicity, we only support executing one statement now + // TODO(yingwen): INSERT/UPDATE/DELETE can be batched + ensure!( + stmts.len() == 1, + ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: format!( + "Only support execute one statement now, current num:{}, query:{}.", + stmts.len(), + req.query + ), } + ); + + let sql_query_request = SqlQueryRequest { + context: Some(GrpcRequestContext { + database: ctx.schema.clone(), + }), + tables: vec![], + sql: req.query.clone(), + }; - QueryRequest::Influxql(request) => { - let mut stmts = frontend - .parse_influxql(&mut sql_ctx, &request.query) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Failed to parse influxql, query:{}", request.query), - })?; - - if stmts.is_empty() { - return Ok(Output::AffectedRows(0)); - } - - ensure!( - stmts.len() == 1, - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!( - "Only support execute one statement now, current num:{}, query:{}.", - stmts.len(), - request.query - ), - } - ); - - frontend - .influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0)) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Failed to build plan, query:{}", request.query), - })? + if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? { + match resp { + ForwardResult::Forwarded(resp) => return convert_sql_response_to_output(resp?), + ForwardResult::Local => (), } }; + // Open partition table if needed. + let table_name = frontend::parse_table_name(&stmts); + if let Some(table_name) = table_name { + self.maybe_open_partition_table_if_not_exist(&ctx.catalog, &ctx.schema, &table_name) + .await?; + } + + // Create logical plan + // Note: Remember to store sql in error when creating logical plan + let plan = frontend + .statement_to_plan(&mut sql_ctx, stmts.remove(0)) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Failed to build plan, query:{}", req.query), + })?; + self.instance .limiter .try_limit(&plan) @@ -189,7 +144,7 @@ impl Proxy { "Query handler finished, request_id:{}, cost:{}ms, request:{:?}", request_id, begin_instant.saturating_elapsed().as_millis(), - query_request + req ); Ok(output) @@ -259,15 +214,6 @@ impl Serialize for ResponseRows { } } -#[derive(Debug)] -pub enum QueryRequest { - Sql(Request), - // TODO: influxql include more parameters, we should add it in later. - // TODO: remove dead_code after implement influxql with proxy - #[allow(dead_code)] - Influxql(InfluxqlRequest), -} - // Convert output to json pub fn convert_output(output: Output) -> Response { match output { diff --git a/proxy/src/influxdb/mod.rs b/proxy/src/influxdb/mod.rs new file mode 100644 index 0000000000..974a975c28 --- /dev/null +++ b/proxy/src/influxdb/mod.rs @@ -0,0 +1,186 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! This module implements [write][1] and [query][2] for InfluxDB. +//! [1]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint +//! [2]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint + +pub mod types; + +use std::time::Instant; + +use common_types::request_id::RequestId; +use common_util::{error::BoxError, time::InstantExt}; +use http::StatusCode; +use interpreters::interpreter::Output; +use log::{debug, info}; +use query_engine::executor::Executor as QueryExecutor; +use query_frontend::{ + frontend::{Context as SqlContext, Frontend}, + provider::CatalogMetaProvider, +}; +use snafu::{ensure, ResultExt}; + +use crate::{ + context::RequestContext, + error::{ErrNoCause, ErrWithCause, Internal, Result}, + execute_plan, + grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext}, + influxdb::types::{ + convert_influxql_output, convert_write_request, InfluxqlRequest, InfluxqlResponse, + WriteRequest, WriteResponse, + }, + Proxy, +}; + +impl Proxy { + pub async fn handle_influxdb_query( + &self, + ctx: RequestContext, + req: InfluxqlRequest, + ) -> Result { + let output = self.fetch_influxdb_query_output(ctx, req).await?; + convert_influxql_output(output) + } + + pub async fn handle_influxdb_write( + &self, + ctx: RequestContext, + req: WriteRequest, + ) -> Result { + let request_id = RequestId::next_id(); + let deadline = ctx.timeout.map(|t| Instant::now() + t); + let catalog = &ctx.catalog; + self.instance.catalog_manager.default_catalog_name(); + let schema = &ctx.schema; + let schema_config = self + .schema_config_provider + .schema_config(schema) + .box_err() + .with_context(|| Internal { + msg: format!("get schema config failed, schema:{schema}"), + })?; + + let write_context = + WriteContext::new(request_id, deadline, catalog.clone(), schema.clone()); + + let plans = write_request_to_insert_plan( + self.instance.clone(), + convert_write_request(req)?, + schema_config, + write_context, + ) + .await + .box_err() + .with_context(|| Internal { + msg: "write request to insert plan", + })?; + + let mut success = 0; + for insert_plan in plans { + success += execute_insert_plan( + request_id, + catalog, + schema, + self.instance.clone(), + insert_plan, + deadline, + ) + .await + .box_err() + .with_context(|| Internal { + msg: "execute plan", + })?; + } + debug!( + "Influxdb write finished, catalog:{}, schema:{}, success:{}", + catalog, schema, success + ); + + Ok(()) + } + + async fn fetch_influxdb_query_output( + &self, + ctx: RequestContext, + req: InfluxqlRequest, + ) -> Result { + let request_id = RequestId::next_id(); + let begin_instant = Instant::now(); + let deadline = ctx.timeout.map(|t| begin_instant + t); + + info!( + "Influxdb query handler try to process request, request_id:{}, request:{:?}", + request_id, req + ); + + // TODO(yingwen): Privilege check, cannot access data of other tenant + // TODO(yingwen): Maybe move MetaProvider to instance + let provider = CatalogMetaProvider { + manager: self.instance.catalog_manager.clone(), + default_catalog: &ctx.catalog, + default_schema: &ctx.schema, + function_registry: &*self.instance.function_registry, + }; + let frontend = Frontend::new(provider); + let mut sql_ctx = SqlContext::new(request_id, deadline); + + let mut stmts = frontend + .parse_influxql(&mut sql_ctx, &req.query) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Failed to parse influxql, query:{}", req.query), + })?; + + if stmts.is_empty() { + return Ok(Output::AffectedRows(0)); + } + + ensure!( + stmts.len() == 1, + ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: format!( + "Only support execute one statement now, current num:{}, query:{}.", + stmts.len(), + req.query + ), + } + ); + + let plan = frontend + .influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0)) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Failed to build plan, query:{}", req.query), + })?; + + self.instance + .limiter + .try_limit(&plan) + .box_err() + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "Query is blocked", + })?; + let output = execute_plan( + request_id, + &ctx.catalog, + &ctx.schema, + self.instance.clone(), + plan, + deadline, + ) + .await?; + + info!( + "Influxdb query handler finished, request_id:{}, cost:{}ms, request:{:?}", + request_id, + begin_instant.saturating_elapsed().as_millis(), + req + ); + + Ok(output) + } +} diff --git a/proxy/src/handlers/influxdb.rs b/proxy/src/influxdb/types.rs similarity index 85% rename from proxy/src/handlers/influxdb.rs rename to proxy/src/influxdb/types.rs index 7924c3e0f6..c55213e8a7 100644 --- a/proxy/src/handlers/influxdb.rs +++ b/proxy/src/influxdb/types.rs @@ -1,50 +1,26 @@ -// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -//! This module implements [write][1] and [query][2] for InfluxDB. -//! [1]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint -//! [2]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint +//! This module contains the types for InfluxDB. -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, - time::Instant, -}; +use std::collections::{BTreeMap, HashMap}; use bytes::Bytes; use ceresdbproto::storage::{ value, Field, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest, }; use common_types::{ - column_schema::ColumnSchema, datum::Datum, record_batch::RecordBatch, request_id::RequestId, - schema::RecordSchema, time::Timestamp, + column_schema::ColumnSchema, datum::Datum, record_batch::RecordBatch, schema::RecordSchema, + time::Timestamp, }; use common_util::error::BoxError; -use handlers::{ - error::{InfluxDbHandlerNoCause, InfluxDbHandlerWithCause, Result}, - query::QueryRequest, -}; use http::Method; use influxdb_line_protocol::FieldValue; use interpreters::interpreter::Output; -use log::debug; -use query_engine::executor::Executor as QueryExecutor; use query_frontend::influxql::planner::CERESDB_MEASUREMENT_COLUMN_NAME; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; -use warp::{reject, reply, Rejection, Reply}; - -use crate::{ - context::RequestContext, - grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext}, - handlers, - instance::InstanceRef, - schema_config_provider::SchemaConfigProviderRef, -}; -pub struct InfluxDb { - instance: InstanceRef, - schema_config_provider: SchemaConfigProviderRef, -} +use crate::error::{Internal, InternalNoCause, Result}; /// Influxql write request compatible with influxdb 1.8 /// @@ -134,21 +110,21 @@ impl InfluxqlRequest { // - q: required(in body when POST and parameters when GET) // - chunked,db,epoch,pretty: in parameters if body.contains_key("params") { - return InfluxDbHandlerNoCause { + return InternalNoCause { msg: "`params` is not supported now", } .fail(); } let query = match method { - Method::GET => params.q.context(InfluxDbHandlerNoCause { + Method::GET => params.q.context(InternalNoCause { msg: "query not found when query by GET", })?, - Method::POST => body.remove("q").context(InfluxDbHandlerNoCause { + Method::POST => body.remove("q").context(InternalNoCause { msg: "query not found when query by POST", })?, other => { - return InfluxDbHandlerNoCause { + return InternalNoCause { msg: format!("method not allowed in query, method:{other}"), } .fail() @@ -306,7 +282,7 @@ impl InfluxqlResultBuilder { let column_schemas = record_schema.columns().to_owned(); ensure!( !column_schemas.is_empty(), - InfluxDbHandlerNoCause { + InternalNoCause { msg: "empty schema", } ); @@ -321,7 +297,7 @@ impl InfluxqlResultBuilder { // described when introducing `column_schemas`. let mut col_iter = column_schemas.iter().enumerate(); // The first column may be measurement column in normal. - ensure!(col_iter.next().unwrap().1.name == CERESDB_MEASUREMENT_COLUMN_NAME, InfluxDbHandlerNoCause { + ensure!(col_iter.next().unwrap().1.name == CERESDB_MEASUREMENT_COLUMN_NAME, InternalNoCause { msg: format!("invalid schema whose first column is not measurement column, schema:{column_schemas:?}"), }); @@ -353,7 +329,7 @@ impl InfluxqlResultBuilder { // Check schema's compatibility. ensure!( record_batch.schema().columns() == self.column_schemas, - InfluxDbHandlerNoCause { + InternalNoCause { msg: format!( "conflict schema, origin:{:?}, new:{:?}", self.column_schemas, @@ -444,7 +420,7 @@ impl InfluxqlResultBuilder { match measurement { Datum::String(m) => m.to_string(), other => { - return InfluxDbHandlerNoCause { + return InternalNoCause { msg: format!("invalid measurement column, column:{other:?}"), } .fail() @@ -459,7 +435,7 @@ impl InfluxqlResultBuilder { Datum::Null => "".to_string(), Datum::String(tag) => tag.to_string(), other => { - return InfluxDbHandlerNoCause { + return InternalNoCause { msg: format!("invalid tag column, column:{other:?}"), } .fail() @@ -497,93 +473,17 @@ struct GroupKey { group_by_tag_values: Vec, } -impl InfluxDb { - pub fn new(instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef) -> Self { - Self { - instance, - schema_config_provider, - } - } - - async fn query(&self, ctx: RequestContext, req: QueryRequest) -> Result { - let output = handlers::query::handle_query(&ctx, self.instance.clone(), req) - .await - .box_err() - .context(InfluxDbHandlerWithCause { - msg: "failed to query by influxql", - })?; - - convert_influxql_output(output) - } - - async fn write(&self, ctx: RequestContext, req: WriteRequest) -> Result { - let request_id = RequestId::next_id(); - let deadline = ctx.timeout.map(|t| Instant::now() + t); - let catalog = &ctx.catalog; - self.instance.catalog_manager.default_catalog_name(); - let schema = &ctx.schema; - let schema_config = self - .schema_config_provider - .schema_config(schema) - .box_err() - .with_context(|| InfluxDbHandlerWithCause { - msg: format!("get schema config failed, schema:{schema}"), - })?; - - let write_context = - WriteContext::new(request_id, deadline, catalog.clone(), schema.clone()); - - let plans = write_request_to_insert_plan( - self.instance.clone(), - convert_write_request(req)?, - schema_config, - write_context, - ) - .await - .box_err() - .with_context(|| InfluxDbHandlerWithCause { - msg: "write request to insert plan", - })?; - - let mut success = 0; - for insert_plan in plans { - success += execute_insert_plan( - request_id, - catalog, - schema, - self.instance.clone(), - insert_plan, - deadline, - ) - .await - .box_err() - .with_context(|| InfluxDbHandlerWithCause { - msg: "execute plan", - })?; - } - debug!( - "Influxdb write finished, catalog:{}, schema:{}, success:{}", - catalog, schema, success - ); - - Ok(()) - } -} - -fn convert_write_request(req: WriteRequest) -> Result> { +pub(crate) fn convert_write_request(req: WriteRequest) -> Result> { let mut req_by_measurement = HashMap::new(); for line in influxdb_line_protocol::parse_lines(&req.lines) { - let mut line = line.box_err().with_context(|| InfluxDbHandlerWithCause { + let mut line = line.box_err().with_context(|| Internal { msg: "invalid line", })?; let timestamp = match line.timestamp { - Some(ts) => req - .precision - .try_normalize(ts) - .context(InfluxDbHandlerNoCause { - msg: "time outside range -9223372036854775806 - 9223372036854775806", - })?, + Some(ts) => req.precision.try_normalize(ts).context(InternalNoCause { + msg: "time outside range -9223372036854775806 - 9223372036854775806", + })?, None => Timestamp::now().as_i64(), }; let mut tag_set = line.series.tag_set.unwrap_or_default(); @@ -661,12 +561,12 @@ fn convert_influx_value(field_value: FieldValue) -> Value { Value { value: Some(v) } } -fn convert_influxql_output(output: Output) -> Result { +pub(crate) fn convert_influxql_output(output: Output) -> Result { // TODO: now, we just support one influxql in each query. let records = match output { Output::Records(records) => records, Output::AffectedRows(_) => { - return InfluxDbHandlerNoCause { + return InternalNoCause { msg: "output in influxql should not be affected rows", } .fail() @@ -694,32 +594,10 @@ fn convert_influxql_output(output: Output) -> Result { }) } -// TODO: Request and response type don't match influxdb's API now. -pub async fn query( - ctx: RequestContext, - db: Arc>, - req: QueryRequest, -) -> std::result::Result { - db.query(ctx, req) - .await - .map_err(reject::custom) - .map(|v| reply::json(&v)) -} - -// TODO: Request and response type don't match influxdb's API now. -pub async fn write( - ctx: RequestContext, - db: Arc>, - req: WriteRequest, -) -> std::result::Result { - db.write(ctx, req) - .await - .map_err(reject::custom) - .map(|_| warp::http::StatusCode::NO_CONTENT) -} - #[cfg(test)] mod tests { + use std::sync::Arc; + use arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema}; use common_types::{ column::{ColumnBlock, ColumnBlockBuilder}, diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 49de4950c3..50ed94470b 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -15,6 +15,7 @@ pub mod handlers; pub mod hotspot; mod hotspot_lru; pub mod http; +pub mod influxdb; pub mod instance; pub mod limiter; pub mod schema_config_provider; diff --git a/server/src/http.rs b/server/src/http.rs index 5c12b78f73..9d8178f112 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -13,20 +13,16 @@ use common_util::{ error::{BoxError, GenericError}, runtime::Runtime, }; -use handlers::query::QueryRequest as HandlerQueryRequest; use log::{error, info}; use logger::RuntimeLevel; use profile::Profiler; use prom_remote_api::web; use proxy::{ context::RequestContext, - handlers::{ - self, - influxdb::{self, InfluxDb, InfluxqlParams, InfluxqlRequest, WriteParams, WriteRequest}, - }, - http::query::{convert_output, QueryRequest, Request}, + handlers::{self}, + http::sql::{convert_output, Request}, + influxdb::types::{InfluxqlParams, InfluxqlRequest, WriteParams, WriteRequest}, instance::InstanceRef, - schema_config_provider::SchemaConfigProviderRef, Proxy, }; use query_engine::executor::Executor as QueryExecutor; @@ -126,7 +122,6 @@ pub struct Service { engine_runtimes: Arc, log_runtime: Arc, profiler: Arc, - influxdb: Arc>, tx: Sender<()>, rx: Option>, config: HttpConfig, @@ -242,9 +237,8 @@ impl Service { .and(self.with_context()) .and(self.with_proxy()) .and_then(|req, ctx, proxy: Arc>| async move { - let req = QueryRequest::Sql(req); let result = proxy - .handle_query(&ctx, req) + .handle_http_sql_query(&ctx, req) .await .map(convert_output) .box_err() @@ -291,12 +285,16 @@ impl Service { .and(warp::post()) .and(body_limit) .and(self.with_context()) - .and(self.with_influxdb()) .and(warp::query::()) .and(warp::body::bytes()) - .and_then(|ctx, db, params, lines| async move { + .and(self.with_proxy()) + .and_then(|ctx, params, lines, proxy: Arc>| async move { let request = WriteRequest::new(lines, params); - influxdb::write(ctx, db, request).await + let result = proxy.handle_influxdb_write(ctx, request).await; + match result { + Ok(res) => Ok(reply::json(&res)), + Err(e) => Err(reject::custom(e)), + } }); // Query support both get and post method, so we can't add `body_limit` here. @@ -305,14 +303,24 @@ impl Service { let query_api = warp::path!("query") .and(warp::method()) .and(self.with_context()) - .and(self.with_influxdb()) .and(warp::query::()) .and(warp::body::form::>()) - .and_then(|method, ctx, db, params, body| async move { - let request = - InfluxqlRequest::try_new(method, body, params).map_err(reject::custom)?; - influxdb::query(ctx, db, HandlerQueryRequest::Influxql(request)).await - }); + .and(self.with_proxy()) + .and_then( + |method, ctx, params, body, proxy: Arc>| async move { + let request = + InfluxqlRequest::try_new(method, body, params).map_err(reject::custom)?; + let result = proxy + .handle_influxdb_query(ctx, request) + .await + .box_err() + .context(HandleRequest); + match result { + Ok(res) => Ok(reply::json(&res)), + Err(e) => Err(reject::custom(e)), + } + }, + ); warp::path!("influxdb" / "v1" / ..).and(write_api.or(query_api)) } @@ -558,13 +566,6 @@ impl Service { warp::any().map(move || runtime.clone()) } - fn with_influxdb( - &self, - ) -> impl Filter>,), Error = Infallible> + Clone { - let influxdb = self.influxdb.clone(); - warp::any().map(move || influxdb.clone()) - } - fn with_instance( &self, ) -> impl Filter,), Error = Infallible> + Clone { @@ -590,8 +591,6 @@ pub struct Builder { config: HttpConfig, engine_runtimes: Option>, log_runtime: Option>, - instance: Option>, - schema_config_provider: Option, config_content: Option, proxy: Option>>, router: Option, @@ -604,8 +603,6 @@ impl Builder { config, engine_runtimes: None, log_runtime: None, - instance: None, - schema_config_provider: None, config_content: None, proxy: None, router: None, @@ -623,16 +620,6 @@ impl Builder { self } - pub fn instance(mut self, instance: InstanceRef) -> Self { - self.instance = Some(instance); - self - } - - pub fn schema_config_provider(mut self, provider: SchemaConfigProviderRef) -> Self { - self.schema_config_provider = Some(provider); - self - } - pub fn config_content(mut self, content: String) -> Self { self.config_content = Some(content); self @@ -659,23 +646,17 @@ impl Builder { pub fn build(self) -> Result> { let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?; let log_runtime = self.log_runtime.context(MissingLogRuntime)?; - let instance = self.instance.context(MissingInstance)?; let config_content = self.config_content.context(MissingInstance)?; let proxy = self.proxy.context(MissingProxy)?; - let schema_config_provider = self - .schema_config_provider - .context(MissingSchemaConfigProvider)?; let router = self.router.context(MissingRouter)?; let opened_wals = self.opened_wals.context(MissingWal)?; - let influxdb = Arc::new(InfluxDb::new(instance, schema_config_provider)); let (tx, rx) = oneshot::channel(); let service = Service { proxy, engine_runtimes, log_runtime, - influxdb, profiler: Arc::new(Profiler::default()), tx, rx: Some(rx), diff --git a/server/src/server.rs b/server/src/server.rs index ade03c3628..34744a9d47 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -369,8 +369,6 @@ impl Builder { let http_service = http::Builder::new(http_config) .engine_runtimes(engine_runtimes.clone()) .log_runtime(log_runtime) - .instance(instance.clone()) - .schema_config_provider(provider.clone()) .config_content(config_content) .proxy(proxy.clone()) .router(router.clone())