Skip to content

Commit

Permalink
feat: Able to pretty print sql query result in http output (#3539)
Browse files Browse the repository at this point in the history
* feat: Able to pretty print sql query result in http output

* fix: add some tests

* fix: add some space, delete fn into_payload, and impl Display for TableResponse
  • Loading branch information
YCCDSZXH authored Mar 20, 2024
1 parent ddbcff6 commit 6377982
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 0 deletions.
31 changes: 31 additions & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use tower_http::decompression::RequestDecompressionLayer;
use tower_http::trace::TraceLayer;

use self::authorize::AuthState;
use self::table_result::TableResponse;
use crate::configurator::ConfiguratorRef;
use crate::error::{AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu};
use crate::http::arrow_result::ArrowResponse;
Expand Down Expand Up @@ -90,6 +91,7 @@ mod dashboard;
pub mod error_result;
pub mod greptime_result_v1;
pub mod influxdb_result_v1;
pub mod table_result;

pub const HTTP_API_VERSION: &str = "v1";
pub const HTTP_API_PREFIX: &str = "/v1/";
Expand Down Expand Up @@ -254,6 +256,7 @@ pub enum GreptimeQueryOutput {
pub enum ResponseFormat {
Arrow,
Csv,
Table,
#[default]
GreptimedbV1,
InfluxdbV1,
Expand All @@ -264,6 +267,7 @@ impl ResponseFormat {
match s {
"arrow" => Some(ResponseFormat::Arrow),
"csv" => Some(ResponseFormat::Csv),
"table" => Some(ResponseFormat::Table),
"greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
"influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
_ => None,
Expand All @@ -274,6 +278,7 @@ impl ResponseFormat {
match self {
ResponseFormat::Arrow => "arrow",
ResponseFormat::Csv => "csv",
ResponseFormat::Table => "table",
ResponseFormat::GreptimedbV1 => "greptimedb_v1",
ResponseFormat::InfluxdbV1 => "influxdb_v1",
}
Expand Down Expand Up @@ -328,6 +333,7 @@ impl Display for Epoch {
pub enum HttpResponse {
Arrow(ArrowResponse),
Csv(CsvResponse),
Table(TableResponse),
Error(ErrorResponse),
GreptimedbV1(GreptimedbV1Response),
InfluxdbV1(InfluxdbV1Response),
Expand All @@ -338,6 +344,7 @@ impl HttpResponse {
match self {
HttpResponse::Arrow(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
Expand All @@ -350,6 +357,7 @@ impl IntoResponse for HttpResponse {
match self {
HttpResponse::Arrow(resp) => resp.into_response(),
HttpResponse::Csv(resp) => resp.into_response(),
HttpResponse::Table(resp) => resp.into_response(),
HttpResponse::GreptimedbV1(resp) => resp.into_response(),
HttpResponse::InfluxdbV1(resp) => resp.into_response(),
HttpResponse::Error(resp) => resp.into_response(),
Expand All @@ -373,6 +381,12 @@ impl From<CsvResponse> for HttpResponse {
}
}

impl From<TableResponse> for HttpResponse {
fn from(value: TableResponse) -> Self {
HttpResponse::Table(value)
}
}

impl From<ErrorResponse> for HttpResponse {
fn from(value: ErrorResponse) -> Self {
HttpResponse::Error(value)
Expand Down Expand Up @@ -971,6 +985,7 @@ mod test {
ResponseFormat::GreptimedbV1,
ResponseFormat::InfluxdbV1,
ResponseFormat::Csv,
ResponseFormat::Table,
ResponseFormat::Arrow,
] {
let recordbatches =
Expand All @@ -979,6 +994,7 @@ mod test {
let json_resp = match format {
ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await,
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
};
Expand Down Expand Up @@ -1021,6 +1037,21 @@ mod test {
panic!("invalid output type");
}
}

HttpResponse::Table(resp) => {
let output = &resp.output()[0];
if let GreptimeQueryOutput::Records(r) = output {
assert_eq!(r.num_rows(), 4);
assert_eq!(r.num_cols(), 2);
assert_eq!(r.schema.column_schemas[0].name, "numbers");
assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.rows[0][0], serde_json::Value::from(1));
assert_eq!(r.rows[0][1], serde_json::Value::Null);
} else {
panic!("invalid output type");
}
}

HttpResponse::Arrow(resp) => {
let output = resp.data;
let mut reader =
Expand Down
2 changes: 2 additions & 0 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::table_result::TableResponse;
use crate::http::{
ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
HttpResponse, ResponseFormat,
Expand Down Expand Up @@ -119,6 +120,7 @@ pub async fn sql(
let resp = match format {
ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await,
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
};
Expand Down
176 changes: 176 additions & 0 deletions src/servers/src/http/table_result.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::max;
use std::fmt::{Display, Write};

use axum::http::{header, HeaderValue};
use axum::response::{IntoResponse, Response};
use common_error::status_code::StatusCode;
use common_query::Output;
use itertools::Itertools;
use mime_guess::mime;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};

#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct TableResponse {
output: Vec<GreptimeQueryOutput>,
execution_time_ms: u64,
}

impl TableResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::Csv, outputs).await {
Err(err) => HttpResponse::Error(err),
Ok((output, _)) => {
if output.len() > 1 {
HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::Table,
StatusCode::InvalidArguments,
"Multi-statements are not allowed".to_string(),
))
} else {
HttpResponse::Table(TableResponse {
output,
execution_time_ms: 0,
})
}
}
}
}

pub fn output(&self) -> &[GreptimeQueryOutput] {
&self.output
}

pub fn with_execution_time(mut self, execution_time: u64) -> Self {
self.execution_time_ms = execution_time;
self
}

pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms
}
}

impl Display for TableResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let payload = match self.output.first() {
None => String::default(),
Some(GreptimeQueryOutput::AffectedRows(n)) => {
format!("{n}\n")
}
Some(GreptimeQueryOutput::Records(records)) => {
let mut max_width = vec![0; records.num_cols()];
let mut result = String::new();
// Determine maximum width for each column
for (i, column) in records.schema.column_schemas.iter().enumerate() {
max_width[i] = max(max_width[i], column.name.len());
}
for row in &records.rows {
for (i, v) in row.iter().enumerate() {
let s = v.to_string();
max_width[i] = max(max_width[i], s.len());
}
}

// Construct the header
let head: String = records
.schema
.column_schemas
.iter()
.enumerate()
.map(|(i, column)| format!("─{:─<1$}─", column.name, max_width[i]))
.join("┬");
writeln!(result, "┌{}┐", head).unwrap();

// Construct rows
for row in &records.rows {
let row = row
.iter()
.enumerate()
.map(|(i, v)| {
let s = v.to_string();
format!(" {:1$} ", s, max_width[i])
})
.join("│");
writeln!(result, "│{row}│").unwrap();
}

// Construct the footer
let footer: String = max_width.iter().map(|v| "─".repeat(*v + 2)).join("┴");
writeln!(result, "└{}┘", footer).unwrap();
result
}
};
write!(f, "{}", payload)
}
}

impl IntoResponse for TableResponse {
fn into_response(self) -> Response {
debug_assert!(
self.output.len() <= 1,
"self.output has extra elements: {}",
self.output.len()
);

let execution_time = self.execution_time_ms;

let mut resp = (
[(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::PLAIN.as_ref()),
)],
self.to_string(),
)
.into_response();
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("TABLE"),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
);
resp
}
}
#[cfg(test)]
mod test {

use super::TableResponse;

#[tokio::test]
async fn test_table_format() {
let data = r#"{"output":[{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"ts","data_type":"TimestampMillisecond"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"}]},"rows":[["127.0.0.1",1702433141000,0.5,0.2],["127.0.0.1",1702433146000,0.3,0.2],["127.0.0.1",1702433151000,0.4,0.3],["127.0.0.2",1702433141000,0.3,0.1],["127.0.0.2",1702433146000,0.2,0.4],["127.0.0.2",1702433151000,0.2,0.4]]}}],"execution_time_ms":13}"#;
let table_response: TableResponse = serde_json::from_str(data).unwrap();
let payload = table_response.to_string();
let expected_payload = r#"┌─host────────┬─ts────────────┬─cpu─┬─memory─┐
│ "127.0.0.1" │ 1702433141000 │ 0.5 │ 0.2 │
│ "127.0.0.1" │ 1702433146000 │ 0.3 │ 0.2 │
│ "127.0.0.1" │ 1702433151000 │ 0.4 │ 0.3 │
│ "127.0.0.2" │ 1702433141000 │ 0.3 │ 0.1 │
│ "127.0.0.2" │ 1702433146000 │ 0.2 │ 0.4 │
│ "127.0.0.2" │ 1702433151000 │ 0.2 │ 0.4 │
└─────────────┴───────────────┴─────┴────────┘
"#;
assert_eq!(payload, expected_payload);
}
}

0 comments on commit 6377982

Please sign in to comment.