Skip to content

Commit

Permalink
feat: impl influxdb api with proxy (apache#875)
Browse files Browse the repository at this point in the history
## Which issue does this PR close?

Closes #

## Rationale for this change
 Implement influxdb api with proxy.

## What changes are included in this PR?
* Add `influxdb` in proxy module.
* Remove some useless code in http query.

## Are there any user-facing changes?
No.

## How does this change test
CI.
  • Loading branch information
chunshao90 authored May 9, 2023
1 parent 066212b commit ea0759d
Show file tree
Hide file tree
Showing 12 changed files with 305 additions and 355 deletions.
6 changes: 4 additions & 2 deletions catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ pub enum Error {
#[snafu(display("Failed to operate table, msg:{}, err:{}", msg, source))]
TableOperatorWithCause { msg: String, source: GenericError },

#[snafu(display("Failed to operate table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
TableOperatorNoCause { msg: String, backtrace: Backtrace },
// Fixme: Temporarily remove the stack information, otherwise you will encounter a
// segmentation fault.
#[snafu(display("Failed to operate table, msg:{}.\n", msg))]
TableOperatorNoCause { msg: String },
}

define_result!(Error);
Expand Down
2 changes: 1 addition & 1 deletion catalog/src/table_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl TableOperator {
} else {
TableOperatorNoCause {
msg: format!(
"Failed to open shard, some tables open failed, no table is shard id:{shard_id}, opened count:{no_table_count}, open error count:{open_err_count}"),
"Failed to open shard, some tables open failed, shard id:{shard_id}, no table is opened count:{no_table_count}, open error count:{open_err_count}"),
}.fail()
}
}
Expand Down
13 changes: 1 addition & 12 deletions proxy/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! Error of handlers

use common_util::{define_result, error::GenericError};
use common_util::define_result;
use snafu::{Backtrace, Snafu};
use warp::reject::Reject;

Expand All @@ -18,11 +18,6 @@ pub enum Error {
source: query_frontend::frontend::Error,
},

#[snafu(display("Failed to parse influxql, err:{}", source))]
ParseInfluxql {
source: query_frontend::frontend::Error,
},

#[snafu(display("Failed to create plan, query:{}, err:{}", query, source))]
CreatePlan {
query: String,
Expand Down Expand Up @@ -77,12 +72,6 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))]
InfluxDbHandlerWithCause { msg: String, source: GenericError },

#[snafu(display("InfluxDb handler failed, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
InfluxDbHandlerNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))]
RouteHandler {
table: String,
Expand Down
1 change: 0 additions & 1 deletion proxy/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

pub mod admin;
pub(crate) mod error;
pub mod influxdb;
pub mod query;
pub mod route;

Expand Down
32 changes: 1 addition & 31 deletions proxy/src/handlers/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use serde::{
use snafu::{ensure, ResultExt};

use crate::handlers::{
error::{
CreatePlan, InterpreterExec, ParseInfluxql, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt,
},
influxdb::InfluxqlRequest,
error::{CreatePlan, InterpreterExec, ParseSql, QueryBlock, QueryTimeout, TooMuchStmt},
prelude::*,
};

Expand Down Expand Up @@ -101,14 +98,11 @@ impl From<Bytes> for Request {
#[derive(Debug)]
pub enum QueryRequest {
Sql(Request),
// TODO: influxql include more parameters, we should add it in later.
Influxql(InfluxqlRequest),
}
impl QueryRequest {
pub fn query(&self) -> &str {
match self {
QueryRequest::Sql(request) => request.query.as_str(),
QueryRequest::Influxql(request) => request.query.as_str(),
}
}
}
Expand Down Expand Up @@ -168,30 +162,6 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
query: &request.query,
})?
}

QueryRequest::Influxql(request) => {
let mut stmts = frontend
.parse_influxql(&mut sql_ctx, &request.query)
.context(ParseInfluxql)?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

ensure!(
stmts.len() == 1,
TooMuchStmt {
len: stmts.len(),
query: &request.query,
}
);

frontend
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?
}
};

instance.limiter.try_limit(&plan).context(QueryBlock {
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

pub mod prom;
pub mod query;
pub mod sql;
172 changes: 59 additions & 113 deletions proxy/src/http/query.rs → proxy/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,22 @@ use crate::{
error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result},
execute_plan,
forward::ForwardResult,
handlers::influxdb::InfluxqlRequest,
Proxy,
};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_query(
pub async fn handle_http_sql_query(
&self,
ctx: &RequestContext,
query_request: QueryRequest,
req: Request,
) -> Result<Output> {
let request_id = RequestId::next_id();
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);

info!(
"Query handler try to process request, request_id:{}, request:{:?}",
request_id, query_request
request_id, req
);

// TODO(yingwen): Privilege check, cannot access data of other tenant
Expand All @@ -63,110 +62,66 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let frontend = Frontend::new(provider);
let mut sql_ctx = SqlContext::new(request_id, deadline);

let plan = match &query_request {
QueryRequest::Sql(request) => {
// Parse sql, frontend error of invalid sql already contains sql
// TODO(yingwen): Maybe move sql from frontend error to outer error
let mut stmts = frontend
.parse_sql(&mut sql_ctx, &request.query)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to parse sql, query:{}", request.query),
})?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

// TODO(yingwen): For simplicity, we only support executing one statement now
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
ensure!(
stmts.len() == 1,
ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!(
"Only support execute one statement now, current num:{}, query:{}.",
stmts.len(),
request.query
),
}
);

let sql_query_request = SqlQueryRequest {
context: Some(GrpcRequestContext {
database: ctx.schema.clone(),
}),
tables: vec![],
sql: request.query.clone(),
};

if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? {
match resp {
ForwardResult::Forwarded(resp) => {
return convert_sql_response_to_output(resp?)
}
ForwardResult::Local => (),
}
};

// Open partition table if needed.
let table_name = frontend::parse_table_name(&stmts);
if let Some(table_name) = table_name {
self.maybe_open_partition_table_if_not_exist(
&ctx.catalog,
&ctx.schema,
&table_name,
)
.await?;
}

// Create logical plan
// Note: Remember to store sql in error when creating logical plan
frontend
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to build plan, query:{}", request.query),
})?
// Parse sql, frontend error of invalid sql already contains sql
// TODO(yingwen): Maybe move sql from frontend error to outer error
let mut stmts = frontend
.parse_sql(&mut sql_ctx, &req.query)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to parse sql, query:{}", req.query),
})?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

// TODO(yingwen): For simplicity, we only support executing one statement now
// TODO(yingwen): INSERT/UPDATE/DELETE can be batched
ensure!(
stmts.len() == 1,
ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!(
"Only support execute one statement now, current num:{}, query:{}.",
stmts.len(),
req.query
),
}
);

let sql_query_request = SqlQueryRequest {
context: Some(GrpcRequestContext {
database: ctx.schema.clone(),
}),
tables: vec![],
sql: req.query.clone(),
};

QueryRequest::Influxql(request) => {
let mut stmts = frontend
.parse_influxql(&mut sql_ctx, &request.query)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to parse influxql, query:{}", request.query),
})?;

if stmts.is_empty() {
return Ok(Output::AffectedRows(0));
}

ensure!(
stmts.len() == 1,
ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!(
"Only support execute one statement now, current num:{}, query:{}.",
stmts.len(),
request.query
),
}
);

frontend
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to build plan, query:{}", request.query),
})?
if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? {
match resp {
ForwardResult::Forwarded(resp) => return convert_sql_response_to_output(resp?),
ForwardResult::Local => (),
}
};

// Open partition table if needed.
let table_name = frontend::parse_table_name(&stmts);
if let Some(table_name) = table_name {
self.maybe_open_partition_table_if_not_exist(&ctx.catalog, &ctx.schema, &table_name)
.await?;
}

// Create logical plan
// Note: Remember to store sql in error when creating logical plan
let plan = frontend
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to build plan, query:{}", req.query),
})?;

self.instance
.limiter
.try_limit(&plan)
Expand All @@ -189,7 +144,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
"Query handler finished, request_id:{}, cost:{}ms, request:{:?}",
request_id,
begin_instant.saturating_elapsed().as_millis(),
query_request
req
);

Ok(output)
Expand Down Expand Up @@ -259,15 +214,6 @@ impl Serialize for ResponseRows {
}
}

#[derive(Debug)]
pub enum QueryRequest {
Sql(Request),
// TODO: influxql include more parameters, we should add it in later.
// TODO: remove dead_code after implement influxql with proxy
#[allow(dead_code)]
Influxql(InfluxqlRequest),
}

// Convert output to json
pub fn convert_output(output: Output) -> Response {
match output {
Expand Down
Loading

0 comments on commit ea0759d

Please sign in to comment.