Skip to content

Commit

Permalink
address CR.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Mar 24, 2023
1 parent 36af326 commit 3b6c41b
Showing 1 changed file with 35 additions and 66 deletions.
101 changes: 35 additions & 66 deletions server/src/handlers/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
//! [1]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint
//! [2]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint

use std::{collections::HashMap, sync::Arc, time::Instant};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Instant,
};

use bytes::Bytes;
use ceresdbproto::storage::{
Expand All @@ -23,7 +27,7 @@ use influxdb_line_protocol::FieldValue;
use interpreters::interpreter::Output;
use log::debug;
use query_engine::executor::Executor as QueryExecutor;
use serde::{ser::SerializeMap, Serialize};
use serde::Serialize;
use snafu::{ensure, ResultExt};
use sql::influxql::planner::CERESDB_MEASUREMENT_COLUMN_NAME;
use warp::{reject, reply, Rejection, Reply};
Expand Down Expand Up @@ -77,80 +81,42 @@ impl From<Bytes> for WriteRequest {

pub type WriteResponse = ();

/// Influxql response organized in the same way with influxdb
/// Influxql response organized in the same way with influxdb.
///
/// The basic example:
/// {"results":[{"statement_id":0,"series":[{"name":"mymeas",
///
/// "columns":["time","myfield","mytag1","mytag2"],
/// "values":[["2017-03-01T00:16:18Z",33.1,null,null],
/// ["2017-03-01T00:17:18Z",12.4,"12","14"]]}]}]}
///
/// You can see more details in:
/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-data-with-a-select-statement
#[derive(Debug, Serialize)]
pub struct InfluxqlResponse {
pub results: Vec<InfluxqlResult>,
pub results: Vec<OneInfluxqlResult>,
}

#[derive(Debug)]
pub struct InfluxqlResult {
#[derive(Debug, Serialize)]
pub struct OneInfluxqlResult {
statement_id: u32,
#[serde(skip_serializing_if = "Option::is_none")]
series: Option<Vec<OneSeries>>,
}

impl Serialize for InfluxqlResult {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut influxql_result = serializer.serialize_map(Some(4))?;
influxql_result.serialize_entry("statement_id", &self.statement_id)?;
if let Some(series) = &self.series {
influxql_result.serialize_entry("series", series)?;
}

influxql_result.end()
}
}

#[derive(Debug)]
#[derive(Debug, Serialize)]
struct OneSeries {
name: String,
tags: Option<Tags>,
#[serde(skip_serializing_if = "Option::is_none")]
tags: Option<BTreeMap<String, String>>,
columns: Vec<String>,
values: Vec<Vec<Datum>>,
}

impl Serialize for OneSeries {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut one_series = serializer.serialize_map(Some(4))?;
one_series.serialize_entry("name", &self.name)?;
if let Some(tags) = &self.tags {
one_series.serialize_entry("tags", &tags)?;
}
one_series.serialize_entry("columns", &self.columns)?;
one_series.serialize_entry("values", &self.values)?;

one_series.end()
}
}

#[derive(Debug)]
struct Tags(Vec<(String, String)>);

impl Serialize for Tags {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut tags = serializer.serialize_map(Some(self.0.len()))?;

for (tagk, tagv) in &self.0 {
tags.serialize_entry(tagk, tagv)?;
}

tags.end()
}
}

/// [InfluxqlResult] builder
#[derive(Default)]
pub struct InfluxqlResultBuilder {
/// Id of the related influxql
/// Query id for [multiple queries](https://docs.influxdata.com/influxdb/v1.8/tools/api/#request-multiple-queries)
statement_id: u32,

/// Schema of influxql query result
Expand All @@ -174,10 +140,13 @@ pub struct InfluxqlResultBuilder {
/// we just use the `measurement` + `tag values` to distinguish them.
group_key_to_idx: HashMap<GroupKey, usize>,

/// Column values
value_groups: Vec<Vec<Vec<Datum>>>,
/// Column values grouped by [GroupKey]
value_groups: Vec<RowGroup>,
}

type Row = Vec<Datum>;
type RowGroup = Vec<Row>;

impl InfluxqlResultBuilder {
pub fn new(record_schema: &RecordSchema, statement_id: u32) -> Result<Self> {
let column_schemas = record_schema.columns().to_owned();
Expand Down Expand Up @@ -256,7 +225,7 @@ impl InfluxqlResultBuilder {
Ok(())
}

pub fn build(self) -> InfluxqlResult {
pub fn build(self) -> OneInfluxqlResult {
let ordered_group_keys = {
let mut ordered_pairs = self
.group_key_to_idx
Expand Down Expand Up @@ -288,9 +257,9 @@ impl InfluxqlResultBuilder {

(tagk, tagv)
})
.collect::<Vec<_>>();
.collect::<BTreeMap<_, _>>();

Some(Tags(tags))
Some(tags)
};

let columns = self
Expand All @@ -308,7 +277,7 @@ impl InfluxqlResultBuilder {
})
.collect();

InfluxqlResult {
OneInfluxqlResult {
series: Some(series),
statement_id: self.statement_id,
}
Expand Down Expand Up @@ -537,7 +506,7 @@ fn convert_influxql_output(output: Output) -> Result<InfluxqlResponse> {
// TODO: now, we just support one influxql in each query.
let influxql_result = if let Output::Records(records) = output {
if records.is_empty() {
InfluxqlResult {
OneInfluxqlResult {
statement_id: 0,
series: None,
}
Expand Down

0 comments on commit 3b6c41b

Please sign in to comment.