From 3a6459131ced5a42d736ffe0a196fc2fabee4d65 Mon Sep 17 00:00:00 2001 From: yccd Date: Tue, 19 Mar 2024 11:32:25 +0800 Subject: [PATCH 1/3] feat: Able to pretty print sql query result in http output --- src/servers/src/http.rs | 31 ++++++ src/servers/src/http/handler.rs | 2 + src/servers/src/http/table_result.rs | 149 +++++++++++++++++++++++++++ 3 files changed, 182 insertions(+) create mode 100644 src/servers/src/http/table_result.rs diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 7a2abf33cf09..f01daa1c37f6 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -49,6 +49,7 @@ use tower_http::decompression::RequestDecompressionLayer; use tower_http::trace::TraceLayer; use self::authorize::AuthState; +use self::table_result::TableResponse; use crate::configurator::ConfiguratorRef; use crate::error::{AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu}; use crate::http::arrow_result::ArrowResponse; @@ -90,6 +91,7 @@ mod dashboard; pub mod error_result; pub mod greptime_result_v1; pub mod influxdb_result_v1; +pub mod table_result; pub const HTTP_API_VERSION: &str = "v1"; pub const HTTP_API_PREFIX: &str = "/v1/"; @@ -254,6 +256,7 @@ pub enum GreptimeQueryOutput { pub enum ResponseFormat { Arrow, Csv, + Table, #[default] GreptimedbV1, InfluxdbV1, @@ -264,6 +267,7 @@ impl ResponseFormat { match s { "arrow" => Some(ResponseFormat::Arrow), "csv" => Some(ResponseFormat::Csv), + "table" => Some(ResponseFormat::Table), "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1), "influxdb_v1" => Some(ResponseFormat::InfluxdbV1), _ => None, @@ -274,6 +278,7 @@ impl ResponseFormat { match self { ResponseFormat::Arrow => "arrow", ResponseFormat::Csv => "csv", + ResponseFormat::Table => "table", ResponseFormat::GreptimedbV1 => "greptimedb_v1", ResponseFormat::InfluxdbV1 => "influxdb_v1", } @@ -328,6 +333,7 @@ impl Display for Epoch { pub enum HttpResponse { Arrow(ArrowResponse), Csv(CsvResponse), + Table(TableResponse), Error(ErrorResponse), GreptimedbV1(GreptimedbV1Response), InfluxdbV1(InfluxdbV1Response), @@ -338,6 +344,7 @@ impl HttpResponse { match self { HttpResponse::Arrow(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(), + HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(), @@ -350,6 +357,7 @@ impl IntoResponse for HttpResponse { match self { HttpResponse::Arrow(resp) => resp.into_response(), HttpResponse::Csv(resp) => resp.into_response(), + HttpResponse::Table(resp) => resp.into_response(), HttpResponse::GreptimedbV1(resp) => resp.into_response(), HttpResponse::InfluxdbV1(resp) => resp.into_response(), HttpResponse::Error(resp) => resp.into_response(), @@ -373,6 +381,12 @@ impl From for HttpResponse { } } +impl From for HttpResponse { + fn from(value: TableResponse) -> Self { + HttpResponse::Table(value) + } +} + impl From for HttpResponse { fn from(value: ErrorResponse) -> Self { HttpResponse::Error(value) @@ -971,6 +985,7 @@ mod test { ResponseFormat::GreptimedbV1, ResponseFormat::InfluxdbV1, ResponseFormat::Csv, + ResponseFormat::Table, ResponseFormat::Arrow, ] { let recordbatches = @@ -979,6 +994,7 @@ mod test { let json_resp = match format { ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await, ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await, }; @@ -1021,6 +1037,21 @@ mod test { panic!("invalid output type"); } } + + HttpResponse::Table(resp) => { + let output = &resp.output()[0]; + if let GreptimeQueryOutput::Records(r) = output { + assert_eq!(r.num_rows(), 4); + assert_eq!(r.num_cols(), 2); + assert_eq!(r.schema.column_schemas[0].name, "numbers"); + assert_eq!(r.schema.column_schemas[0].data_type, "UInt32"); + assert_eq!(r.rows[0][0], serde_json::Value::from(1)); + assert_eq!(r.rows[0][1], serde_json::Value::Null); + } else { + panic!("invalid output type"); + } + } + HttpResponse::Arrow(resp) => { let output = resp.data; let mut reader = diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 7506ebbddbd0..614b69b3d9c9 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -38,6 +38,7 @@ use crate::http::csv_result::CsvResponse; use crate::http::error_result::ErrorResponse; use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::influxdb_result_v1::InfluxdbV1Response; +use crate::http::table_result::TableResponse; use crate::http::{ ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput, HttpResponse, ResponseFormat, @@ -119,6 +120,7 @@ pub async fn sql( let resp = match format { ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await, ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, }; diff --git a/src/servers/src/http/table_result.rs b/src/servers/src/http/table_result.rs new file mode 100644 index 000000000000..d5851bfcaded --- /dev/null +++ b/src/servers/src/http/table_result.rs @@ -0,0 +1,149 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::max; +use std::fmt::Write; + +use axum::http::{header, HeaderValue}; +use axum::response::{IntoResponse, Response}; +use common_error::status_code::StatusCode; +use common_query::Output; +use itertools::Itertools; +use mime_guess::mime; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::http::error_result::ErrorResponse; +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct TableResponse { + output: Vec, + execution_time_ms: u64, +} + +impl TableResponse { + pub async fn from_output(outputs: Vec>) -> HttpResponse { + match handler::from_output(ResponseFormat::Csv, outputs).await { + Err(err) => HttpResponse::Error(err), + Ok((output, _)) => { + if output.len() > 1 { + HttpResponse::Error(ErrorResponse::from_error_message( + ResponseFormat::Table, + StatusCode::InvalidArguments, + "Multi-statements are not allowed".to_string(), + )) + } else { + HttpResponse::Table(TableResponse { + output, + execution_time_ms: 0, + }) + } + } + } + } + + pub fn output(&self) -> &[GreptimeQueryOutput] { + &self.output + } + + pub fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self + } + + pub fn execution_time_ms(&self) -> u64 { + self.execution_time_ms + } +} + +impl IntoResponse for TableResponse { + fn into_response(mut self) -> Response { + debug_assert!( + self.output.len() <= 1, + "self.output has extra elements: {}", + self.output.len() + ); + + let execution_time = self.execution_time_ms; + + let payload = match self.output.pop() { + None => String::default(), + Some(GreptimeQueryOutput::AffectedRows(n)) => { + format!("{n}\n") + } + Some(GreptimeQueryOutput::Records(records)) => { + let mut max_width = vec![0; records.num_cols()]; + let mut result = String::new(); + // Determine maximum width for each column + for (i, column) in records.schema.column_schemas.iter().enumerate() { + max_width[i] = max(max_width[i], column.name.len()); + } + for row in &records.rows { + for (i, v) in row.iter().enumerate() { + let s = v.to_string(); + max_width[i] = max(max_width[i], s.len()); + } + } + + // Construct the header + let head: String = records + .schema + .column_schemas + .iter() + .enumerate() + .map(|(i, column)| format!("─{:─<1$}─", column.name, max_width[i])) + .join("┬"); + writeln!(result, "┌{}┐", head).unwrap(); + + // Construct rows + for row in &records.rows { + let row = row + .iter() + .enumerate() + .map(|(i, v)| { + let s = v.to_string(); + format!(" {:1$} ", s, max_width[i]) + }) + .join("│"); + writeln!(result, "│{row}│").unwrap(); + } + + // Construct the footer + let footer: String = max_width.iter().map(|v| "─".repeat(*v + 2)).join("┴"); + writeln!(result, "└{}┘", footer).unwrap(); + result + } + }; + + let mut resp = ( + [( + header::CONTENT_TYPE, + HeaderValue::from_static(mime::PLAIN.as_ref()), + )], + payload, + ) + .into_response(); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_FORMAT, + HeaderValue::from_static("TABLE"), + ); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(execution_time), + ); + resp + } +} From 0126a60164edf933e35b317cbb1aad3c1d0198f2 Mon Sep 17 00:00:00 2001 From: yccd Date: Tue, 19 Mar 2024 22:47:57 +0800 Subject: [PATCH 2/3] fix: add some tests --- src/servers/src/http/table_result.rs | 53 ++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/src/servers/src/http/table_result.rs b/src/servers/src/http/table_result.rs index d5851bfcaded..9c72941ba96c 100644 --- a/src/servers/src/http/table_result.rs +++ b/src/servers/src/http/table_result.rs @@ -67,19 +67,8 @@ impl TableResponse { pub fn execution_time_ms(&self) -> u64 { self.execution_time_ms } -} - -impl IntoResponse for TableResponse { - fn into_response(mut self) -> Response { - debug_assert!( - self.output.len() <= 1, - "self.output has extra elements: {}", - self.output.len() - ); - - let execution_time = self.execution_time_ms; - - let payload = match self.output.pop() { + fn into_payload(&mut self) -> String { + match self.output.pop() { None => String::default(), Some(GreptimeQueryOutput::AffectedRows(n)) => { format!("{n}\n") @@ -126,14 +115,26 @@ impl IntoResponse for TableResponse { writeln!(result, "└{}┘", footer).unwrap(); result } - }; + } + } +} + +impl IntoResponse for TableResponse { + fn into_response(mut self) -> Response { + debug_assert!( + self.output.len() <= 1, + "self.output has extra elements: {}", + self.output.len() + ); + + let execution_time = self.execution_time_ms; let mut resp = ( [( header::CONTENT_TYPE, HeaderValue::from_static(mime::PLAIN.as_ref()), )], - payload, + self.into_payload(), ) .into_response(); resp.headers_mut().insert( @@ -147,3 +148,25 @@ impl IntoResponse for TableResponse { resp } } +#[cfg(test)] +mod test { + + use super::TableResponse; + + #[tokio::test] + async fn test_table_format() { + let data = r#"{"output":[{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"ts","data_type":"TimestampMillisecond"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"}]},"rows":[["127.0.0.1",1702433141000,0.5,0.2],["127.0.0.1",1702433146000,0.3,0.2],["127.0.0.1",1702433151000,0.4,0.3],["127.0.0.2",1702433141000,0.3,0.1],["127.0.0.2",1702433146000,0.2,0.4],["127.0.0.2",1702433151000,0.2,0.4]]}}],"execution_time_ms":13}"#; + let mut table_response: TableResponse = serde_json::from_str(data).unwrap(); + let payload = table_response.into_payload(); + let expected_payload = r#"┌─host────────┬─ts────────────┬─cpu─┬─memory─┐ +│ "127.0.0.1" │ 1702433141000 │ 0.5 │ 0.2 │ +│ "127.0.0.1" │ 1702433146000 │ 0.3 │ 0.2 │ +│ "127.0.0.1" │ 1702433151000 │ 0.4 │ 0.3 │ +│ "127.0.0.2" │ 1702433141000 │ 0.3 │ 0.1 │ +│ "127.0.0.2" │ 1702433146000 │ 0.2 │ 0.4 │ +│ "127.0.0.2" │ 1702433151000 │ 0.2 │ 0.4 │ +└─────────────┴───────────────┴─────┴────────┘ +"#; + assert_eq!(payload, expected_payload); + } +} From e84e7da30b27f0148e2903d179c65055da9f0192 Mon Sep 17 00:00:00 2001 From: yccd Date: Tue, 19 Mar 2024 23:31:51 +0800 Subject: [PATCH 3/3] fix: add some space, delete fn into_payload, and impl Display for TableResponse --- src/servers/src/http/table_result.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/servers/src/http/table_result.rs b/src/servers/src/http/table_result.rs index 9c72941ba96c..005eec479a8d 100644 --- a/src/servers/src/http/table_result.rs +++ b/src/servers/src/http/table_result.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::cmp::max; -use std::fmt::Write; +use std::fmt::{Display, Write}; use axum::http::{header, HeaderValue}; use axum::response::{IntoResponse, Response}; @@ -67,8 +67,11 @@ impl TableResponse { pub fn execution_time_ms(&self) -> u64 { self.execution_time_ms } - fn into_payload(&mut self) -> String { - match self.output.pop() { +} + +impl Display for TableResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let payload = match self.output.first() { None => String::default(), Some(GreptimeQueryOutput::AffectedRows(n)) => { format!("{n}\n") @@ -115,12 +118,13 @@ impl TableResponse { writeln!(result, "└{}┘", footer).unwrap(); result } - } + }; + write!(f, "{}", payload) } } impl IntoResponse for TableResponse { - fn into_response(mut self) -> Response { + fn into_response(self) -> Response { debug_assert!( self.output.len() <= 1, "self.output has extra elements: {}", @@ -134,7 +138,7 @@ impl IntoResponse for TableResponse { header::CONTENT_TYPE, HeaderValue::from_static(mime::PLAIN.as_ref()), )], - self.into_payload(), + self.to_string(), ) .into_response(); resp.headers_mut().insert( @@ -156,8 +160,8 @@ mod test { #[tokio::test] async fn test_table_format() { let data = r#"{"output":[{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"ts","data_type":"TimestampMillisecond"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"}]},"rows":[["127.0.0.1",1702433141000,0.5,0.2],["127.0.0.1",1702433146000,0.3,0.2],["127.0.0.1",1702433151000,0.4,0.3],["127.0.0.2",1702433141000,0.3,0.1],["127.0.0.2",1702433146000,0.2,0.4],["127.0.0.2",1702433151000,0.2,0.4]]}}],"execution_time_ms":13}"#; - let mut table_response: TableResponse = serde_json::from_str(data).unwrap(); - let payload = table_response.into_payload(); + let table_response: TableResponse = serde_json::from_str(data).unwrap(); + let payload = table_response.to_string(); let expected_payload = r#"┌─host────────┬─ts────────────┬─cpu─┬─memory─┐ │ "127.0.0.1" │ 1702433141000 │ 0.5 │ 0.2 │ │ "127.0.0.1" │ 1702433146000 │ 0.3 │ 0.2 │